From 29eb668d0f5a8489878d3850083c77370d76fe64 Mon Sep 17 00:00:00 2001 From: Henrik Johansson Date: Tue, 27 Aug 2019 17:40:23 +0200 Subject: [PATCH 01/17] gemini: lazy generation of partition keys --- cmd/gemini/generators.go | 6 +- cmd/gemini/jobs.go | 37 ++++---- cmd/gemini/root.go | 22 +++-- generator.go | 197 ++++++++++++++++----------------------- generator_test.go | 2 +- inflight.go | 39 ++++++++ schema.go | 25 +++-- source.go | 40 ++++++++ 8 files changed, 217 insertions(+), 151 deletions(-) create mode 100644 inflight.go create mode 100644 source.go diff --git a/cmd/gemini/generators.go b/cmd/gemini/generators.go index fe842cfa..bb5f1ae9 100644 --- a/cmd/gemini/generators.go +++ b/cmd/gemini/generators.go @@ -5,7 +5,7 @@ import ( "go.uber.org/zap" ) -func createGenerators(schema *gemini.Schema, schemaConfig gemini.SchemaConfig, distributionFunc gemini.DistributionFunc, actors, distributionSize uint64, logger *zap.Logger) []*gemini.Generators { +func createGenerators(schema *gemini.Schema, schemaConfig gemini.SchemaConfig, distributionFunc gemini.DistributionFunc, actors, distributionSize uint64, logger *zap.Logger) []*gemini.Generator { partitionRangeConfig := gemini.PartitionRangeConfig{ MaxBlobLength: schemaConfig.MaxBlobLength, MinBlobLength: schemaConfig.MinBlobLength, @@ -13,9 +13,9 @@ func createGenerators(schema *gemini.Schema, schemaConfig gemini.SchemaConfig, d MinStringLength: schemaConfig.MinStringLength, } - var gs []*gemini.Generators + var gs []*gemini.Generator for _, table := range schema.Tables { - gCfg := &gemini.GeneratorsConfig{ + gCfg := &gemini.GeneratorConfig{ Partitions: partitionRangeConfig, Size: actors, DistributionSize: distributionSize, diff --git a/cmd/gemini/jobs.go b/cmd/gemini/jobs.go index e03b7019..264a97aa 100644 --- a/cmd/gemini/jobs.go +++ b/cmd/gemini/jobs.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "math" "sync" "time" @@ -15,7 +16,7 @@ import ( // MutationJob continuously applies mutations against the database // for as long as the pump is active. -func MutationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Source, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) { +func MutationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) { defer wg.Done() schemaConfig := &schemaCfg logger = logger.Named("mutation_job") @@ -29,7 +30,7 @@ func MutationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, } else { mutation(ctx, schema, schemaConfig, table, s, r, p, source, &testStatus, true, logger) } - if i%1000 == 0 { + if i%100 == 0 { c <- testStatus testStatus = Status{} } @@ -43,7 +44,7 @@ func MutationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, // ValidationJob continuously applies validations against the database // for as long as the pump is active. -func ValidationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Source, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) { +func ValidationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) { defer wg.Done() schemaConfig := &schemaCfg logger = logger.Named("validation_job") @@ -53,7 +54,7 @@ func ValidationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGrou for hb := range pump { hb.await() validation(ctx, schema, schemaConfig, table, s, r, p, source, &testStatus, logger) - if i%1000 == 0 { + if i%100 == 0 { c <- testStatus testStatus = Status{} } @@ -67,7 +68,7 @@ func ValidationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGrou // WarmupJob continuously applies mutations against the database // for as long as the pump is active or the supplied duration expires. -func WarmupJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Source, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) { +func WarmupJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) { defer wg.Done() schemaConfig := &schemaCfg testStatus := Status{} @@ -96,7 +97,7 @@ func WarmupJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, s } } -func job(done *sync.WaitGroup, f testJob, actors uint64, schema *gemini.Schema, schemaConfig gemini.SchemaConfig, s store.Store, pump *Pump, generators []*gemini.Generators, result chan Status, logger *zap.Logger) { +func job(done *sync.WaitGroup, f testJob, actors uint64, schema *gemini.Schema, schemaConfig gemini.SchemaConfig, s store.Store, pump *Pump, generators []*gemini.Generator, result chan Status, logger *zap.Logger) { defer done.Done() var finished sync.WaitGroup finished.Add(1) @@ -106,17 +107,21 @@ func job(done *sync.WaitGroup, f testJob, actors uint64, schema *gemini.Schema, workerCtx, _ := context.WithCancel(context.Background()) workers.Add(len(schema.Tables) * int(actors)) - partitionRangeConfig := gemini.PartitionRangeConfig{ - MaxBlobLength: schemaConfig.MaxBlobLength, - MinBlobLength: schemaConfig.MinBlobLength, - MaxStringLength: schemaConfig.MaxStringLength, - MinStringLength: schemaConfig.MinStringLength, - } - for j, table := range schema.Tables { + g := generators[j] + delta := math.MaxUint64 % actors for i := 0; i < int(actors); i++ { + start := uint64(i) + partitionRangeConfig := gemini.PartitionRangeConfig{ + Left: start * delta, + Right: start*delta + delta, + MaxBlobLength: schemaConfig.MaxBlobLength, + MinBlobLength: schemaConfig.MinBlobLength, + MaxStringLength: schemaConfig.MaxStringLength, + MinStringLength: schemaConfig.MinStringLength, + } r := rand.New(rand.NewSource(seed)) - go f(workerCtx, pump.ch, &workers, schema, schemaConfig, table, s, r, partitionRangeConfig, generators[j].Get(i), result, mode, warmup, logger) + go f(workerCtx, pump.ch, &workers, schema, schemaConfig, table, s, r, partitionRangeConfig, g, result, mode, warmup, logger) } } @@ -168,7 +173,7 @@ func ddl(ctx context.Context, schema *gemini.Schema, sc *gemini.SchemaConfig, ta } } -func mutation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Source, testStatus *Status, deletes bool, logger *zap.Logger) { +func mutation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, testStatus *Status, deletes bool, logger *zap.Logger) { mutateStmt, err := schema.GenMutateStmt(table, source, r, p, deletes) if err != nil { logger.Error("Failed! Mutation statement generation failed", zap.Error(err)) @@ -204,7 +209,7 @@ func mutation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig } } -func validation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Source, testStatus *Status, logger *zap.Logger) { +func validation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, testStatus *Status, logger *zap.Logger) { checkStmt := schema.GenCheckStmt(table, source, r, p) if checkStmt == nil { if w := logger.Check(zap.DebugLevel, "no statement generated"); w != nil { diff --git a/cmd/gemini/root.go b/cmd/gemini/root.go index b0352bcc..61810cbc 100644 --- a/cmd/gemini/root.go +++ b/cmd/gemini/root.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "math" "net/http" _ "net/http/pprof" "os" @@ -87,7 +88,7 @@ func interactive() bool { return !nonInteractive } -type testJob func(context.Context, <-chan heartBeat, *sync.WaitGroup, *gemini.Schema, gemini.SchemaConfig, *gemini.Table, store.Store, *rand.Rand, gemini.PartitionRangeConfig, *gemini.Source, chan Status, string, time.Duration, *zap.Logger) +type testJob func(context.Context, <-chan heartBeat, *sync.WaitGroup, *gemini.Schema, gemini.SchemaConfig, *gemini.Table, store.Store, *rand.Rand, gemini.PartitionRangeConfig, *gemini.Generator, chan Status, string, time.Duration, *zap.Logger) func readSchema(confFile string) (*gemini.Schema, error) { byteValue, err := ioutil.ReadFile(confFile) @@ -211,6 +212,11 @@ func run(cmd *cobra.Command, args []string) error { pump := createPump(10000, logger) generators := createGenerators(schema, schemaConfig, distFunc, concurrency, distributionSize, logger) go func() { + defer func() { + for _, g := range generators { + g.Stop() + } + }() defer done.Done() var sp *spinner.Spinner = nil if interactive() { @@ -243,8 +249,12 @@ func createFile(fname string, def *os.File) (*os.File, error) { } const ( - stdDistMean = 0.5 - oneStdDev = 0.341 + /* + stdDistMean = 0.5 + oneStdDev = 0.341 + */ + stdDistMean = math.MaxUint64 / 2 + oneStdDev = 0.341 * math.MaxUint64 ) func createDistributionFunc(distribution string, size, seed uint64, mu, sigma float64) (gemini.DistributionFunc, error) { @@ -273,7 +283,7 @@ func createDistributionFunc(distribution string, size, seed uint64, mu, sigma fl } } -func launch(schema *gemini.Schema, schemaConfig gemini.SchemaConfig, store store.Store, pump *Pump, generators []*gemini.Generators, result chan Status, logger *zap.Logger) { +func launch(schema *gemini.Schema, schemaConfig gemini.SchemaConfig, store store.Store, pump *Pump, generators []*gemini.Generator, result chan Status, logger *zap.Logger) { if warmup > 0 { done := &sync.WaitGroup{} done.Add(1) @@ -418,8 +428,8 @@ func init() { rootCmd.Flags().IntVarP(&maxRetriesMutate, "max-mutation-retries", "", 2, "Maximum number of attempts to apply a mutation") rootCmd.Flags().DurationVarP(&maxRetriesMutateSleep, "max-mutation-retries-backoff", "", 10*time.Millisecond, "Duration between attempts to apply a mutation for example 10ms or 1s") rootCmd.Flags().Uint64VarP(&pkBufferReuseSize, "partition-key-buffer-reuse-size", "", 2000, "Number of reused buffered partition keys") - rootCmd.Flags().Uint64VarP(&distributionSize, "distribution-size", "", 1000000, "Number of partition keys each worker creates") - rootCmd.Flags().StringVarP(&partitionKeyDistribution, "partition-key-distribution", "", "uniform", "Specify the distribution from which to draw partition keys, supported values are currently uniform|normal|exponential") + rootCmd.Flags().Uint64VarP(&distributionSize, "distribution-size", "", math.MaxUint64, "Number of partition keys each worker creates") + rootCmd.Flags().StringVarP(&partitionKeyDistribution, "partition-key-distribution", "", "uniform", "Specify the distribution from which to draw partition keys, supported values are currently uniform|normal|zipf") rootCmd.Flags().Float64VarP(&normalDistMean, "normal-dist-mean", "", stdDistMean, "Mean of the normal distribution") rootCmd.Flags().Float64VarP(&normalDistSigma, "normal-dist-sigma", "", oneStdDev, "Sigma of the normal distribution, defaults to one standard deviation ~0.341") rootCmd.Flags().StringVarP(&tracingOutFile, "tracing-outfile", "", "", "Specify the file to which tracing information gets written. Two magic names are available, 'stdout' and 'stderr'. By default tracing is disabled.") diff --git a/generator.go b/generator.go index eec91884..f453404f 100644 --- a/generator.go +++ b/generator.go @@ -2,76 +2,30 @@ package gemini import ( "sync" - "time" - "github.com/scylladb/gemini/murmur" "github.com/scylladb/go-set/u64set" + + "github.com/scylladb/gemini/murmur" "go.uber.org/zap" "golang.org/x/exp/rand" ) type DistributionFunc func() uint64 -type Source struct { - values []ValueWithToken - idxFunc func() uint64 - oldValues chan ValueWithToken - inFlight syncU64set -} - -//Get returns a new value and ensures that it's corresponding token -//is not already in-flight. -func (s *Source) Get() (ValueWithToken, bool) { - for { - v := s.pick() - if s.inFlight.addIfNotPresent(v.Token) { - return v, true - } - } -} - -//GetOld returns a previously used value and token or a new if -//the old queue is empty. -func (s *Source) GetOld() (ValueWithToken, bool) { - select { - case v, ok := <-s.oldValues: - return v, ok - default: - // There are no old values so we generate a new - return s.Get() - } -} - -// GiveOld returns the supplied value for later reuse unless the value -//is empty in which case it removes the corresponding token from the -// in-flight tracking. -func (s *Source) GiveOld(v ValueWithToken) { - if len(v.Value) == 0 { - s.inFlight.delete(v.Token) - return - } - select { - case s.oldValues <- v: - default: - // Old source is full, just drop the value - } -} - -func (s *Source) pick() ValueWithToken { - return s.values[s.idxFunc()] -} - -type Generators struct { - generators []*Source +type Generator struct { + sources []*Source + inFlight *syncU64set size uint64 distributionSize uint64 table *Table partitionsConfig PartitionRangeConfig seed uint64 + idxFunc func() uint64 + doneCh chan struct{} logger *zap.Logger } -type GeneratorsConfig struct { +type GeneratorConfig struct { Partitions PartitionRangeConfig DistributionSize uint64 DistributionFunc DistributionFunc @@ -80,31 +34,46 @@ type GeneratorsConfig struct { PkUsedBufferSize uint64 } -func NewGenerator(table *Table, config *GeneratorsConfig, logger *zap.Logger) *Generators { - generators := make([]*Source, config.Size) +func NewGenerator(table *Table, config *GeneratorConfig, logger *zap.Logger) *Generator { + sources := make([]*Source, config.Size) for i := uint64(0); i < config.Size; i++ { - generators[i] = &Source{ - values: make([]ValueWithToken, 0, config.DistributionSize), - idxFunc: config.DistributionFunc, + done := &sync.WaitGroup{} + done.Add(1) + sources[i] = &Source{ + values: make(chan ValueWithToken, config.PkUsedBufferSize), oldValues: make(chan ValueWithToken, config.PkUsedBufferSize), - inFlight: syncU64set{pks: u64set.New()}, } } - gs := &Generators{ - generators: generators, + gs := &Generator{ + sources: sources, + inFlight: &syncU64set{pks: u64set.New(), mu: &sync.RWMutex{}}, size: config.Size, distributionSize: config.DistributionSize, table: table, partitionsConfig: config.Partitions, seed: config.Seed, + idxFunc: config.DistributionFunc, + doneCh: make(chan struct{}, 1), logger: logger, } - gs.create() + gs.start() return gs } -func (gs Generators) Get(idx int) *Source { - return gs.generators[idx] +func (g Generator) Get() (ValueWithToken, bool) { + source := g.sources[g.idxFunc()%g.size] + for { + v := source.pick() + if g.inFlight.addIfNotPresent(v.Token) { + return v, true + } + } +} + +//GetOld returns a previously used value and token or a new if +//the old queue is empty. +func (g Generator) GetOld() (ValueWithToken, bool) { + return g.sources[g.idxFunc()%g.size].getOld() } type ValueWithToken struct { @@ -112,38 +81,58 @@ type ValueWithToken struct { Value Value } -func (gs *Generators) create() { - gs.logger.Info("generating partition keys, this can take a while", zap.Uint64("distribution_size", gs.distributionSize)) - start := time.Now() - routingKeyCreator := &RoutingKeyCreator{} - r := rand.New(rand.NewSource(gs.seed)) - fullSources := u64set.New() - for { - values := gs.createPartitionKeyValues(r) - hash := hash(routingKeyCreator, gs.table, values) - idx := hash % gs.size - source := gs.generators[idx] - if fullSources.Has(idx) { - continue - } - if uint64(len(source.values)) < gs.distributionSize { - source.values = append(source.values, ValueWithToken{Token: hash, Value: values}) - } - if uint64(len(source.values)) == gs.distributionSize { - gs.logger.Debug("partial generation", zap.Uint64("source", idx), zap.Int("size", len(source.values))) - fullSources.Add(idx) - } - if fullSources.Size() == len(gs.generators) { - gs.logger.Info("finished generating partition ids", zap.Duration("duration", time.Since(start))) - break - } +// GiveOld returns the supplied value for later reuse unless the value +//is empty in which case it removes the corresponding token from the +// in-flight tracking. +func (g *Generator) GiveOld(v ValueWithToken) { + source := g.sources[v.Token%g.size] + if len(v.Value) == 0 { + g.inFlight.delete(v.Token) + return + } + source.giveOld(v) +} + +func (g *Generator) Stop() { + g.doneCh <- struct{}{} + for _, s := range g.sources { + close(s.oldValues) } } -func (gs *Generators) createPartitionKeyValues(r *rand.Rand) []interface{} { +func (g *Generator) start() { + go func() { + g.logger.Info("starting partition key generation loop") + routingKeyCreator := &RoutingKeyCreator{} + r := rand.New(rand.NewSource(g.seed)) + for { + values := g.createPartitionKeyValues(r) + hash := hash(routingKeyCreator, g.table, values) + idx := hash % g.size + source := g.sources[idx] + select { + case source.values <- ValueWithToken{Token: hash, Value: values}: + case <-g.doneCh: + g.logger.Info("stopping partition key generation loop") + return + default: + //time.Sleep(10 * time.Microsecond) + /* + for pos, s := range g.sources { + fmt.Printf("%d, values=%d, old=%d, ", pos, len(s.values), len(s.oldValues)) + } + fmt.Println() + */ + // source is full, drop it this time + } + } + }() +} + +func (g *Generator) createPartitionKeyValues(r *rand.Rand) []interface{} { var values []interface{} - for _, pk := range gs.table.PartitionKeys { - values = append(values, pk.Type.GenValue(r, gs.partitionsConfig)...) + for _, pk := range g.table.PartitionKeys { + values = append(values, pk.Type.GenValue(r, g.partitionsConfig)...) } return values } @@ -152,25 +141,3 @@ func hash(rkc *RoutingKeyCreator, t *Table, values []interface{}) uint64 { b, _ := rkc.CreateRoutingKey(t, values) return uint64(murmur.Murmur3H1(b)) } - -type syncU64set struct { - pks *u64set.Set - mu sync.Mutex -} - -func (s *syncU64set) delete(v uint64) bool { - s.mu.Lock() - _, found := s.pks.Pop2() - s.mu.Unlock() - return found -} - -func (s *syncU64set) addIfNotPresent(v uint64) bool { - s.mu.Lock() - defer s.mu.Unlock() - if s.pks.Has(v) { - return false - } - s.pks.Add(v) - return true -} diff --git a/generator_test.go b/generator_test.go index f3bb81c3..29672ff1 100644 --- a/generator_test.go +++ b/generator_test.go @@ -14,7 +14,7 @@ func TestGenerator(t *testing.T) { PartitionKeys: createPkColumns(1, "pk"), } var current uint64 - cfg := &GeneratorsConfig{ + cfg := &GeneratorConfig{ Partitions: PartitionRangeConfig{ MaxStringLength: 10, MinStringLength: 0, diff --git a/inflight.go b/inflight.go new file mode 100644 index 00000000..ce2b702d --- /dev/null +++ b/inflight.go @@ -0,0 +1,39 @@ +package gemini + +import ( + "sync" + + "github.com/scylladb/go-set/u64set" +) + +//syncU64set is a u64set protected by a sync.RWLock +//It could potentially become contended and then we +//should replace it with a sharded version. +type syncU64set struct { + pks *u64set.Set + mu *sync.RWMutex +} + +func (s *syncU64set) delete(v uint64) bool { + s.mu.Lock() + _, found := s.pks.Pop2() + s.mu.Unlock() + return found +} + +func (s *syncU64set) addIfNotPresent(v uint64) bool { + s.mu.RLock() + if s.pks.Has(v) { + s.mu.RUnlock() + return false + } + s.mu.RUnlock() + s.mu.Lock() + defer s.mu.Unlock() + if s.pks.Has(v) { + // double check + return false + } + s.pks.Add(v) + return true +} diff --git a/schema.go b/schema.go index 6a262427..9fae8a60 100644 --- a/schema.go +++ b/schema.go @@ -352,6 +352,8 @@ type Schema struct { } type PartitionRangeConfig struct { + Left uint64 + Right uint64 MaxBlobLength int MinBlobLength int MaxStringLength int @@ -536,7 +538,7 @@ func (s *Schema) GetCreateSchema() []string { return stmts } -func (s *Schema) GenInsertStmt(t *Table, source *Source, r *rand.Rand, p PartitionRangeConfig) (*Stmt, error) { +func (s *Schema) GenInsertStmt(t *Table, source *Generator, r *rand.Rand, p PartitionRangeConfig) (*Stmt, error) { t.mu.RLock() defer t.mu.RUnlock() @@ -578,7 +580,7 @@ func (s *Schema) GenInsertStmt(t *Table, source *Source, r *rand.Rand, p Partiti }, nil } -func (s *Schema) GenInsertJsonStmt(t *Table, source *Source, r *rand.Rand, p PartitionRangeConfig) (*Stmt, error) { +func (s *Schema) GenInsertJsonStmt(t *Table, source *Generator, r *rand.Rand, p PartitionRangeConfig) (*Stmt, error) { t.mu.RLock() defer t.mu.RUnlock() @@ -636,7 +638,7 @@ func (s *Schema) GenInsertJsonStmt(t *Table, source *Source, r *rand.Rand, p Par }, nil } -func (s *Schema) GenDeleteRows(t *Table, source *Source, r *rand.Rand, p PartitionRangeConfig) (*Stmt, error) { +func (s *Schema) GenDeleteRows(t *Table, source *Generator, r *rand.Rand, p PartitionRangeConfig) (*Stmt, error) { t.mu.RLock() defer t.mu.RUnlock() @@ -681,7 +683,7 @@ func (s *Schema) GenDDLStmt(t *Table, r *rand.Rand, p PartitionRangeConfig, sc * } } -func (s *Schema) GenMutateStmt(t *Table, source *Source, r *rand.Rand, p PartitionRangeConfig, deletes bool) (*Stmt, error) { +func (s *Schema) GenMutateStmt(t *Table, source *Generator, r *rand.Rand, p PartitionRangeConfig, deletes bool) (*Stmt, error) { t.mu.RLock() defer t.mu.RUnlock() @@ -704,7 +706,7 @@ func (s *Schema) GenMutateStmt(t *Table, source *Source, r *rand.Rand, p Partiti } } -func (s *Schema) GenCheckStmt(t *Table, source *Source, r *rand.Rand, p PartitionRangeConfig) *Stmt { +func (s *Schema) GenCheckStmt(t *Table, source *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { var n int if len(t.Indexes) > 0 { n = r.Intn(5) @@ -733,7 +735,7 @@ func (s *Schema) GenCheckStmt(t *Table, source *Source, r *rand.Rand, p Partitio return nil } -func (s *Schema) genSinglePartitionQuery(t *Table, source *Source, r *rand.Rand, p PartitionRangeConfig) *Stmt { +func (s *Schema) genSinglePartitionQuery(t *Table, source *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { t.mu.RLock() defer t.mu.RUnlock() @@ -756,6 +758,9 @@ func (s *Schema) genSinglePartitionQuery(t *Table, source *Source, r *rand.Rand, if !ok { return nil } + if len(values.Value) != len(t.PartitionKeys) { + fmt.Printf("values=%d, pk=%d\n", len(values.Value), len(t.PartitionKeys)) + } return &Stmt{ Query: builder, Values: func() (uint64, []interface{}) { @@ -765,7 +770,7 @@ func (s *Schema) genSinglePartitionQuery(t *Table, source *Source, r *rand.Rand, } } -func (s *Schema) genMultiplePartitionQuery(t *Table, source *Source, r *rand.Rand, p PartitionRangeConfig) *Stmt { +func (s *Schema) genMultiplePartitionQuery(t *Table, source *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { t.mu.RLock() defer t.mu.RUnlock() @@ -808,7 +813,7 @@ func (s *Schema) genMultiplePartitionQuery(t *Table, source *Source, r *rand.Ran } } -func (s *Schema) genClusteringRangeQuery(t *Table, source *Source, r *rand.Rand, p PartitionRangeConfig) *Stmt { +func (s *Schema) genClusteringRangeQuery(t *Table, source *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { t.mu.RLock() defer t.mu.RUnlock() @@ -858,7 +863,7 @@ func (s *Schema) genClusteringRangeQuery(t *Table, source *Source, r *rand.Rand, } } -func (s *Schema) genMultiplePartitionClusteringRangeQuery(t *Table, source *Source, r *rand.Rand, p PartitionRangeConfig) *Stmt { +func (s *Schema) genMultiplePartitionClusteringRangeQuery(t *Table, source *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { t.mu.RLock() defer t.mu.RUnlock() @@ -914,7 +919,7 @@ func (s *Schema) genMultiplePartitionClusteringRangeQuery(t *Table, source *Sour } } -func (s *Schema) genSingleIndexQuery(t *Table, source *Source, r *rand.Rand, p PartitionRangeConfig) *Stmt { +func (s *Schema) genSingleIndexQuery(t *Table, source *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { t.mu.RLock() defer t.mu.RUnlock() diff --git a/source.go b/source.go new file mode 100644 index 00000000..8a9c333e --- /dev/null +++ b/source.go @@ -0,0 +1,40 @@ +package gemini + +type Source struct { + values chan ValueWithToken + oldValues chan ValueWithToken +} + +//get returns a new value and ensures that it's corresponding token +//is not already in-flight. +func (s *Source) get() (ValueWithToken, bool) { + return s.pick(), true +} + +//getOld returns a previously used value and token or a new if +//the old queue is empty. +func (s *Source) getOld() (ValueWithToken, bool) { + select { + case v, ok := <-s.oldValues: + return v, ok + /*default: + // There are no old values so we generate a new + return s.get() + */ + } +} + +// giveOld returns the supplied value for later reuse unless the value +//is empty in which case it removes the corresponding token from the +// in-flight tracking. +func (s *Source) giveOld(v ValueWithToken) { + select { + case s.oldValues <- v: + default: + // Old source is full, just drop the value + } +} + +func (s *Source) pick() ValueWithToken { + return <-s.values +} From 2c1bb39ddd7de038e588abf5988b19b9f69ebd8a Mon Sep 17 00:00:00 2001 From: Henrik Johansson Date: Mon, 2 Sep 2019 09:58:12 +0200 Subject: [PATCH 02/17] gemini: inflight cache is now sharded for better blocking behavior --- generator.go | 10 +++--- inflight.go | 39 --------------------- inflight/inflight.go | 82 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 87 insertions(+), 44 deletions(-) delete mode 100644 inflight.go create mode 100644 inflight/inflight.go diff --git a/generator.go b/generator.go index f453404f..0c82493a 100644 --- a/generator.go +++ b/generator.go @@ -3,7 +3,7 @@ package gemini import ( "sync" - "github.com/scylladb/go-set/u64set" + "github.com/scylladb/gemini/inflight" "github.com/scylladb/gemini/murmur" "go.uber.org/zap" @@ -14,7 +14,7 @@ type DistributionFunc func() uint64 type Generator struct { sources []*Source - inFlight *syncU64set + inFlight inflight.InFlight size uint64 distributionSize uint64 table *Table @@ -46,7 +46,7 @@ func NewGenerator(table *Table, config *GeneratorConfig, logger *zap.Logger) *Ge } gs := &Generator{ sources: sources, - inFlight: &syncU64set{pks: u64set.New(), mu: &sync.RWMutex{}}, + inFlight: inflight.NewConcurrent(), size: config.Size, distributionSize: config.DistributionSize, table: table, @@ -64,7 +64,7 @@ func (g Generator) Get() (ValueWithToken, bool) { source := g.sources[g.idxFunc()%g.size] for { v := source.pick() - if g.inFlight.addIfNotPresent(v.Token) { + if g.inFlight.AddIfNotPresent(v.Token) { return v, true } } @@ -87,7 +87,7 @@ type ValueWithToken struct { func (g *Generator) GiveOld(v ValueWithToken) { source := g.sources[v.Token%g.size] if len(v.Value) == 0 { - g.inFlight.delete(v.Token) + g.inFlight.Delete(v.Token) return } source.giveOld(v) diff --git a/inflight.go b/inflight.go deleted file mode 100644 index ce2b702d..00000000 --- a/inflight.go +++ /dev/null @@ -1,39 +0,0 @@ -package gemini - -import ( - "sync" - - "github.com/scylladb/go-set/u64set" -) - -//syncU64set is a u64set protected by a sync.RWLock -//It could potentially become contended and then we -//should replace it with a sharded version. -type syncU64set struct { - pks *u64set.Set - mu *sync.RWMutex -} - -func (s *syncU64set) delete(v uint64) bool { - s.mu.Lock() - _, found := s.pks.Pop2() - s.mu.Unlock() - return found -} - -func (s *syncU64set) addIfNotPresent(v uint64) bool { - s.mu.RLock() - if s.pks.Has(v) { - s.mu.RUnlock() - return false - } - s.mu.RUnlock() - s.mu.Lock() - defer s.mu.Unlock() - if s.pks.Has(v) { - // double check - return false - } - s.pks.Add(v) - return true -} diff --git a/inflight/inflight.go b/inflight/inflight.go new file mode 100644 index 00000000..f7c21790 --- /dev/null +++ b/inflight/inflight.go @@ -0,0 +1,82 @@ +package inflight + +import ( + "sync" + + "github.com/scylladb/go-set/u64set" +) + +type InFlight interface { + AddIfNotPresent(uint64) bool + Delete(uint64) bool +} + +//New creates a instance of a simple InFlight set. +//It's internal data is protected by a simple sync.RWMutex. +func New() InFlight { + return newSyncU64set() +} + +func newSyncU64set() *syncU64set { + return &syncU64set{ + pks: u64set.New(), + mu: &sync.RWMutex{}, + } +} + +//NewConcurrent creates a instance of a sharded InFlight set. +//It shards the values over 256 buckets which should afford a +//decent increase in concurrency support. +func NewConcurrent() InFlight { + s := &shardedSyncU64set{} + for i := range s.shards { + s.shards[i] = newSyncU64set() + } + return s +} + +//shardedSyncU64set is a sharded InFlight implementation protected by a sync.RWLock +//which should support greater concurrency. +type shardedSyncU64set struct { + shards [256]*syncU64set +} + +func (s *shardedSyncU64set) Delete(v uint64) bool { + ss := s.shards[v%256] + return ss.Delete(v) +} + +func (s *shardedSyncU64set) AddIfNotPresent(v uint64) bool { + ss := s.shards[v%256] + return ss.AddIfNotPresent(v) +} + +//syncU64set is an InFlight implementation protected by a sync.RWLock +type syncU64set struct { + pks *u64set.Set + mu *sync.RWMutex +} + +func (s *syncU64set) Delete(v uint64) bool { + s.mu.Lock() + _, found := s.pks.Pop2() + s.mu.Unlock() + return found +} + +func (s *syncU64set) AddIfNotPresent(v uint64) bool { + s.mu.RLock() + if s.pks.Has(v) { + s.mu.RUnlock() + return false + } + s.mu.RUnlock() + s.mu.Lock() + defer s.mu.Unlock() + if s.pks.Has(v) { + // double check + return false + } + s.pks.Add(v) + return true +} From 6e6bdfb3d6e901f323c4bc82228b81b9b92c6eeb Mon Sep 17 00:00:00 2001 From: Henrik Johansson Date: Mon, 2 Sep 2019 13:29:23 +0200 Subject: [PATCH 03/17] gemini: added inflight tests --- generator_test.go | 10 ++-- inflight/inflight.go | 4 ++ inflight/inflight_test.go | 102 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 110 insertions(+), 6 deletions(-) create mode 100644 inflight/inflight_test.go diff --git a/generator_test.go b/generator_test.go index 29672ff1..6989e5a9 100644 --- a/generator_test.go +++ b/generator_test.go @@ -1,7 +1,6 @@ package gemini import ( - "reflect" "sync/atomic" "testing" @@ -21,7 +20,7 @@ func TestGenerator(t *testing.T) { MaxBlobLength: 10, MinBlobLength: 0, }, - Size: 1, + Size: 10000, PkUsedBufferSize: 10000, DistributionSize: 1000, DistributionFunc: func() uint64 { @@ -30,12 +29,11 @@ func TestGenerator(t *testing.T) { } logger, _ := zap.NewDevelopment() generators := NewGenerator(table, cfg, logger) - source := generators.Get(0) for i := uint64(0); i < cfg.DistributionSize; i++ { atomic.StoreUint64(¤t, i) - v, _ := source.Get() - n, _ := source.Get() - if !reflect.DeepEqual(v, n) { + v, _ := generators.Get() + n, _ := generators.Get() + if v.Token%generators.size != n.Token%generators.size { t.Errorf("expected %v, got %v", v, n) } } diff --git a/inflight/inflight.go b/inflight/inflight.go index f7c21790..22108258 100644 --- a/inflight/inflight.go +++ b/inflight/inflight.go @@ -28,6 +28,10 @@ func newSyncU64set() *syncU64set { //It shards the values over 256 buckets which should afford a //decent increase in concurrency support. func NewConcurrent() InFlight { + return newShardedSyncU64set() +} + +func newShardedSyncU64set() *shardedSyncU64set { s := &shardedSyncU64set{} for i := range s.shards { s.shards[i] = newSyncU64set() diff --git a/inflight/inflight_test.go b/inflight/inflight_test.go new file mode 100644 index 00000000..eb9c6ee5 --- /dev/null +++ b/inflight/inflight_test.go @@ -0,0 +1,102 @@ +package inflight + +import ( + "math/rand" + "reflect" + "testing" + "testing/quick" +) + +func TestAddIfNotPresent(t *testing.T) { + t.Parallel() + flight := newSyncU64set() + if !flight.AddIfNotPresent(10) { + t.Error("could not add the first value") + } + if flight.AddIfNotPresent(10) { + t.Error("value added twice") + } +} + +func TestDelete(t *testing.T) { + t.Parallel() + flight := newSyncU64set() + flight.AddIfNotPresent(10) + + if !flight.Delete(10) { + t.Error("did not delete the value") + } + if flight.Delete(10) { + t.Error("deleted the value twice") + } +} + +func TestAddIfNotPresentSharded(t *testing.T) { + t.Parallel() + flight := newShardedSyncU64set() + if !flight.AddIfNotPresent(10) { + t.Error("could not add the first value") + } + if flight.AddIfNotPresent(10) { + t.Error("value added twice") + } +} + +func TestDeleteSharded(t *testing.T) { + t.Parallel() + flight := newShardedSyncU64set() + flight.AddIfNotPresent(10) + + if !flight.Delete(10) { + t.Error("did not delete the value") + } + if flight.Delete(10) { + t.Error("deleted the value twice") + } +} + +func TestInflight(t *testing.T) { + t.Parallel() + flight := newSyncU64set() + f := func(v uint64) interface{} { + return flight.AddIfNotPresent(v) + } + g := func(v uint64) interface{} { + return flight.Delete(v) + } + + cfg := createQuickConfig() + if err := quick.CheckEqual(f, g, cfg); err != nil { + t.Error(err) + } +} + +func TestInflightSharded(t *testing.T) { + t.Parallel() + flight := newShardedSyncU64set() + f := func(v uint64) interface{} { + return flight.AddIfNotPresent(v) + } + g := func(v uint64) interface{} { + return flight.Delete(v) + } + + cfg := createQuickConfig() + if err := quick.CheckEqual(f, g, cfg); err != nil { + t.Error(err) + } +} + +func createQuickConfig() *quick.Config { + return &quick.Config{ + MaxCount: 2000000, + Values: func(vs []reflect.Value, r *rand.Rand) { + for i := 0; i < len(vs); i++ { + uv := r.Uint64() + v := reflect.New(reflect.TypeOf(uv)).Elem() + v.SetUint(uv) + vs[i] = v + } + }, + } +} From 91042f7a9e55d3aaa3fdf7e23d6364c7b7afd341 Mon Sep 17 00:00:00 2001 From: Henrik Johansson Date: Mon, 2 Sep 2019 16:17:04 +0200 Subject: [PATCH 04/17] gemini: changelog for lazy partition keys --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 48ff3de3..942b7f85 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ # Unreleased +- Lazy partition key generation reintroduced to avoid out of memory issues. - Fix overlapping operations on the same partition key ([#198](https://github.com/scylladb/gemini/issues/198)). - Partition keys can now be drawn from various distributions such as ___"zipf"___, ___"uniform"___ and ___"normal"___. The CLI argument `--partition-key-distribution` is used From 2c5243347042e9e35e8997b2c4c4d7ad5108ce25 Mon Sep 17 00:00:00 2001 From: Henrik Johansson Date: Tue, 3 Sep 2019 16:19:34 +0200 Subject: [PATCH 05/17] inflight: correct delete functionality --- inflight/inflight.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/inflight/inflight.go b/inflight/inflight.go index 22108258..307bf74f 100644 --- a/inflight/inflight.go +++ b/inflight/inflight.go @@ -62,10 +62,20 @@ type syncU64set struct { } func (s *syncU64set) Delete(v uint64) bool { + s.mu.RLock() + if !s.pks.Has(v) { + s.mu.RUnlock() + return false + } + s.mu.RUnlock() s.mu.Lock() - _, found := s.pks.Pop2() - s.mu.Unlock() - return found + defer s.mu.Unlock() + if !s.pks.Has(v) { + // double check + return false + } + s.pks.Remove(v) + return true } func (s *syncU64set) AddIfNotPresent(v uint64) bool { From af47060bd85574a5e3b127f869967ab07d32d3b9 Mon Sep 17 00:00:00 2001 From: Henrik Johansson Date: Wed, 4 Sep 2019 08:45:37 +0200 Subject: [PATCH 06/17] gemini: restoring reporting frequency --- cmd/gemini/jobs.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/gemini/jobs.go b/cmd/gemini/jobs.go index 264a97aa..438544fc 100644 --- a/cmd/gemini/jobs.go +++ b/cmd/gemini/jobs.go @@ -30,7 +30,7 @@ func MutationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, } else { mutation(ctx, schema, schemaConfig, table, s, r, p, source, &testStatus, true, logger) } - if i%100 == 0 { + if i%1000 == 0 { c <- testStatus testStatus = Status{} } @@ -54,7 +54,7 @@ func ValidationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGrou for hb := range pump { hb.await() validation(ctx, schema, schemaConfig, table, s, r, p, source, &testStatus, logger) - if i%100 == 0 { + if i%1000 == 0 { c <- testStatus testStatus = Status{} } From 6375d585e9232d77354e3d9083b55905a10de7c7 Mon Sep 17 00:00:00 2001 From: Henrik Johansson Date: Wed, 4 Sep 2019 08:46:05 +0200 Subject: [PATCH 07/17] inflight: correct comment spacing --- inflight/inflight.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/inflight/inflight.go b/inflight/inflight.go index 307bf74f..fb97c4e1 100644 --- a/inflight/inflight.go +++ b/inflight/inflight.go @@ -11,8 +11,8 @@ type InFlight interface { Delete(uint64) bool } -//New creates a instance of a simple InFlight set. -//It's internal data is protected by a simple sync.RWMutex. +// New creates a instance of a simple InFlight set. +// It's internal data is protected by a simple sync.RWMutex. func New() InFlight { return newSyncU64set() } @@ -24,9 +24,9 @@ func newSyncU64set() *syncU64set { } } -//NewConcurrent creates a instance of a sharded InFlight set. -//It shards the values over 256 buckets which should afford a -//decent increase in concurrency support. +// NewConcurrent creates a instance of a sharded InFlight set. +// It shards the values over 256 buckets which should afford a +// decent increase in concurrency support. func NewConcurrent() InFlight { return newShardedSyncU64set() } @@ -39,8 +39,8 @@ func newShardedSyncU64set() *shardedSyncU64set { return s } -//shardedSyncU64set is a sharded InFlight implementation protected by a sync.RWLock -//which should support greater concurrency. +// shardedSyncU64set is a sharded InFlight implementation protected by a sync.RWLock +// which should support greater concurrency. type shardedSyncU64set struct { shards [256]*syncU64set } @@ -55,7 +55,7 @@ func (s *shardedSyncU64set) AddIfNotPresent(v uint64) bool { return ss.AddIfNotPresent(v) } -//syncU64set is an InFlight implementation protected by a sync.RWLock +// syncU64set is an InFlight implementation protected by a sync.RWLock type syncU64set struct { pks *u64set.Set mu *sync.RWMutex From 4b7e31580cd27dbf696ba639b8e2c8cdf219b029 Mon Sep 17 00:00:00 2001 From: Henrik Johansson Date: Wed, 4 Sep 2019 08:46:43 +0200 Subject: [PATCH 08/17] gemini: fixup debug code and comments --- generator.go | 14 +++----------- schema.go | 3 --- 2 files changed, 3 insertions(+), 14 deletions(-) diff --git a/generator.go b/generator.go index 0c82493a..99e38b53 100644 --- a/generator.go +++ b/generator.go @@ -70,8 +70,8 @@ func (g Generator) Get() (ValueWithToken, bool) { } } -//GetOld returns a previously used value and token or a new if -//the old queue is empty. +// GetOld returns a previously used value and token or a new if +// the old queue is empty. func (g Generator) GetOld() (ValueWithToken, bool) { return g.sources[g.idxFunc()%g.size].getOld() } @@ -82,7 +82,7 @@ type ValueWithToken struct { } // GiveOld returns the supplied value for later reuse unless the value -//is empty in which case it removes the corresponding token from the +// is empty in which case it removes the corresponding token from the // in-flight tracking. func (g *Generator) GiveOld(v ValueWithToken) { source := g.sources[v.Token%g.size] @@ -116,14 +116,6 @@ func (g *Generator) start() { g.logger.Info("stopping partition key generation loop") return default: - //time.Sleep(10 * time.Microsecond) - /* - for pos, s := range g.sources { - fmt.Printf("%d, values=%d, old=%d, ", pos, len(s.values), len(s.oldValues)) - } - fmt.Println() - */ - // source is full, drop it this time } } }() diff --git a/schema.go b/schema.go index 9fae8a60..4d0f6c76 100644 --- a/schema.go +++ b/schema.go @@ -758,9 +758,6 @@ func (s *Schema) genSinglePartitionQuery(t *Table, source *Generator, r *rand.Ra if !ok { return nil } - if len(values.Value) != len(t.PartitionKeys) { - fmt.Printf("values=%d, pk=%d\n", len(values.Value), len(t.PartitionKeys)) - } return &Stmt{ Query: builder, Values: func() (uint64, []interface{}) { From 94466f111711ce47fc0e23ba3c7d5cd740eac4d8 Mon Sep 17 00:00:00 2001 From: Henrik Johansson Date: Wed, 4 Sep 2019 11:34:07 +0200 Subject: [PATCH 09/17] gemini: introduce gemini.DistributionFunc contract The contract that the gemini.DistributionFunc adheres to has thus far been underspecified. This commits attempts to formalize what it is expected to return and why. --- cmd/gemini/root.go | 14 +++++++------- generator.go | 19 +++++++++++++++---- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/cmd/gemini/root.go b/cmd/gemini/root.go index 61810cbc..c8e3e43c 100644 --- a/cmd/gemini/root.go +++ b/cmd/gemini/root.go @@ -260,9 +260,9 @@ const ( func createDistributionFunc(distribution string, size, seed uint64, mu, sigma float64) (gemini.DistributionFunc, error) { switch strings.ToLower(distribution) { case "zipf": - dist := rand.NewZipf(rand.New(rand.NewSource(seed)), 1.1, 1.1, size-1) - return func() uint64 { - return dist.Uint64() + dist := rand.NewZipf(rand.New(rand.NewSource(seed)), 1.1, 1.1, size) + return func() gemini.TokenIndex { + return gemini.TokenIndex(dist.Uint64()) }, nil case "normal": dist := distuv.Normal{ @@ -270,13 +270,13 @@ func createDistributionFunc(distribution string, size, seed uint64, mu, sigma fl Mu: mu, Sigma: sigma, } - return func() uint64 { - return uint64(dist.Rand()) * size + return func() gemini.TokenIndex { + return gemini.TokenIndex(dist.Rand()) }, nil case "uniform": rnd := rand.New(rand.NewSource(seed)) - return func() uint64 { - return rnd.Uint64n(size) + return func() gemini.TokenIndex { + return gemini.TokenIndex(rnd.Uint64n(size)) }, nil default: return nil, errors.Errorf("unsupported distribution: %s", distribution) diff --git a/generator.go b/generator.go index 99e38b53..1acd6233 100644 --- a/generator.go +++ b/generator.go @@ -10,7 +10,18 @@ import ( "golang.org/x/exp/rand" ) -type DistributionFunc func() uint64 +// TokenIndex represents the position of a token in the token ring. +// A token index is translated to a token by a generator. If the generator +// preserves the exact position, then the token index becomes the token; +// otherwise token index represents an approximation of the token. +// +// We use a token index approach, because our generators actually generate +// partition keys, and map them to tokens. The generators, therefore, do +// not populate the full token ring space. With token index, we can +// approximate different token distributions from a sparse set of tokens. +type TokenIndex uint64 + +type DistributionFunc func() TokenIndex type Generator struct { sources []*Source @@ -20,7 +31,7 @@ type Generator struct { table *Table partitionsConfig PartitionRangeConfig seed uint64 - idxFunc func() uint64 + idxFunc DistributionFunc doneCh chan struct{} logger *zap.Logger } @@ -61,7 +72,7 @@ func NewGenerator(table *Table, config *GeneratorConfig, logger *zap.Logger) *Ge } func (g Generator) Get() (ValueWithToken, bool) { - source := g.sources[g.idxFunc()%g.size] + source := g.sources[uint64(g.idxFunc())%g.size] for { v := source.pick() if g.inFlight.AddIfNotPresent(v.Token) { @@ -73,7 +84,7 @@ func (g Generator) Get() (ValueWithToken, bool) { // GetOld returns a previously used value and token or a new if // the old queue is empty. func (g Generator) GetOld() (ValueWithToken, bool) { - return g.sources[g.idxFunc()%g.size].getOld() + return g.sources[uint64(g.idxFunc())%g.size].getOld() } type ValueWithToken struct { From 84dbd28aa29a6f463cb40be975394366d43cb1f7 Mon Sep 17 00:00:00 2001 From: Henrik Johansson Date: Wed, 4 Sep 2019 14:04:18 +0200 Subject: [PATCH 10/17] gemini: cleanup after review --- cmd/gemini/jobs.go | 18 +++++++----------- cmd/gemini/root.go | 4 ---- schema.go | 2 -- source.go | 2 +- 4 files changed, 8 insertions(+), 18 deletions(-) diff --git a/cmd/gemini/jobs.go b/cmd/gemini/jobs.go index 438544fc..a1bea70c 100644 --- a/cmd/gemini/jobs.go +++ b/cmd/gemini/jobs.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "math" "sync" "time" @@ -107,19 +106,16 @@ func job(done *sync.WaitGroup, f testJob, actors uint64, schema *gemini.Schema, workerCtx, _ := context.WithCancel(context.Background()) workers.Add(len(schema.Tables) * int(actors)) + partitionRangeConfig := gemini.PartitionRangeConfig{ + MaxBlobLength: schemaConfig.MaxBlobLength, + MinBlobLength: schemaConfig.MinBlobLength, + MaxStringLength: schemaConfig.MaxStringLength, + MinStringLength: schemaConfig.MinStringLength, + } + for j, table := range schema.Tables { g := generators[j] - delta := math.MaxUint64 % actors for i := 0; i < int(actors); i++ { - start := uint64(i) - partitionRangeConfig := gemini.PartitionRangeConfig{ - Left: start * delta, - Right: start*delta + delta, - MaxBlobLength: schemaConfig.MaxBlobLength, - MinBlobLength: schemaConfig.MinBlobLength, - MaxStringLength: schemaConfig.MaxStringLength, - MinStringLength: schemaConfig.MinStringLength, - } r := rand.New(rand.NewSource(seed)) go f(workerCtx, pump.ch, &workers, schema, schemaConfig, table, s, r, partitionRangeConfig, g, result, mode, warmup, logger) } diff --git a/cmd/gemini/root.go b/cmd/gemini/root.go index c8e3e43c..b323afa3 100644 --- a/cmd/gemini/root.go +++ b/cmd/gemini/root.go @@ -249,10 +249,6 @@ func createFile(fname string, def *os.File) (*os.File, error) { } const ( - /* - stdDistMean = 0.5 - oneStdDev = 0.341 - */ stdDistMean = math.MaxUint64 / 2 oneStdDev = 0.341 * math.MaxUint64 ) diff --git a/schema.go b/schema.go index 4d0f6c76..51237d14 100644 --- a/schema.go +++ b/schema.go @@ -352,8 +352,6 @@ type Schema struct { } type PartitionRangeConfig struct { - Left uint64 - Right uint64 MaxBlobLength int MinBlobLength int MaxStringLength int diff --git a/source.go b/source.go index 8a9c333e..0812cfa1 100644 --- a/source.go +++ b/source.go @@ -25,7 +25,7 @@ func (s *Source) getOld() (ValueWithToken, bool) { } // giveOld returns the supplied value for later reuse unless the value -//is empty in which case it removes the corresponding token from the +// is empty in which case it removes the corresponding token from the // in-flight tracking. func (s *Source) giveOld(v ValueWithToken) { select { From 583d711e4317542bbefe4a26704c4061fe44e32f Mon Sep 17 00:00:00 2001 From: Henrik Johansson Date: Wed, 4 Sep 2019 15:30:54 +0200 Subject: [PATCH 11/17] gemini: tomb package used to coordinate shutdown Hand wiring goroutine termination was messy. Using the package gopkg.in/tomb.v2 makes it much simpler, safer and nicer. This commit fixes a deadlock that happened sometimes diring heavy operations and external shutdown. --- cmd/gemini/jobs.go | 40 ++++++++++----------- cmd/gemini/pump.go | 32 +++++++++-------- cmd/gemini/root.go | 82 ++++++++++++++++++++++++++++---------------- cmd/gemini/status.go | 4 +-- generator.go | 41 ++++++++++++++-------- go.mod | 1 + go.sum | 2 ++ source.go | 11 +++--- 8 files changed, 125 insertions(+), 88 deletions(-) diff --git a/cmd/gemini/jobs.go b/cmd/gemini/jobs.go index a1bea70c..6376c59c 100644 --- a/cmd/gemini/jobs.go +++ b/cmd/gemini/jobs.go @@ -4,22 +4,25 @@ import ( "context" "encoding/json" "fmt" - "sync" "time" "github.com/scylladb/gemini" "github.com/scylladb/gemini/store" "go.uber.org/zap" "golang.org/x/exp/rand" + "gopkg.in/tomb.v2" ) // MutationJob continuously applies mutations against the database // for as long as the pump is active. -func MutationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) { - defer wg.Done() +func MutationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) { schemaConfig := &schemaCfg logger = logger.Named("mutation_job") testStatus := Status{} + defer func() { + // Send any remaining updates back + c <- testStatus + }() var i int for hb := range pump { hb.await() @@ -35,7 +38,7 @@ func MutationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, } if failFast && (testStatus.ReadErrors > 0 || testStatus.WriteErrors > 0) { c <- testStatus - break + return } i++ } @@ -43,12 +46,14 @@ func MutationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, // ValidationJob continuously applies validations against the database // for as long as the pump is active. -func ValidationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) { - defer wg.Done() +func ValidationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) { schemaConfig := &schemaCfg logger = logger.Named("validation_job") testStatus := Status{} + defer func() { + c <- testStatus + }() var i int for hb := range pump { hb.await() @@ -58,8 +63,7 @@ func ValidationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGrou testStatus = Status{} } if failFast && (testStatus.ReadErrors > 0 || testStatus.WriteErrors > 0) { - c <- testStatus - break + return } i++ } @@ -67,8 +71,7 @@ func ValidationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGrou // WarmupJob continuously applies mutations against the database // for as long as the pump is active or the supplied duration expires. -func WarmupJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) { - defer wg.Done() +func WarmupJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) { schemaConfig := &schemaCfg testStatus := Status{} var i int @@ -96,16 +99,8 @@ func WarmupJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, s } } -func job(done *sync.WaitGroup, f testJob, actors uint64, schema *gemini.Schema, schemaConfig gemini.SchemaConfig, s store.Store, pump *Pump, generators []*gemini.Generator, result chan Status, logger *zap.Logger) { - defer done.Done() - var finished sync.WaitGroup - finished.Add(1) - - // Wait group for the worker goroutines. - var workers sync.WaitGroup +func job(t *tomb.Tomb, f testJob, actors uint64, schema *gemini.Schema, schemaConfig gemini.SchemaConfig, s store.Store, pump *Pump, generators []*gemini.Generator, result chan Status, logger *zap.Logger) { workerCtx, _ := context.WithCancel(context.Background()) - workers.Add(len(schema.Tables) * int(actors)) - partitionRangeConfig := gemini.PartitionRangeConfig{ MaxBlobLength: schemaConfig.MaxBlobLength, MinBlobLength: schemaConfig.MinBlobLength, @@ -117,11 +112,12 @@ func job(done *sync.WaitGroup, f testJob, actors uint64, schema *gemini.Schema, g := generators[j] for i := 0; i < int(actors); i++ { r := rand.New(rand.NewSource(seed)) - go f(workerCtx, pump.ch, &workers, schema, schemaConfig, table, s, r, partitionRangeConfig, g, result, mode, warmup, logger) + t.Go(func() error { + f(workerCtx, pump.ch, schema, schemaConfig, table, s, r, partitionRangeConfig, g, result, mode, warmup, logger) + return nil + }) } } - - workers.Wait() } func ddl(ctx context.Context, schema *gemini.Schema, sc *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, testStatus *Status, logger *zap.Logger) { diff --git a/cmd/gemini/pump.go b/cmd/gemini/pump.go index c591cf1a..0421e8a9 100644 --- a/cmd/gemini/pump.go +++ b/cmd/gemini/pump.go @@ -7,12 +7,14 @@ import ( "time" "github.com/briandowns/spinner" + "github.com/scylladb/gemini" "go.uber.org/zap" + "gopkg.in/tomb.v2" ) type Pump struct { ch chan heartBeat - done chan struct{} + t *tomb.Tomb graceful chan os.Signal logger *zap.Logger } @@ -28,29 +30,26 @@ func (hb heartBeat) await() { } func (p *Pump) Start(d time.Duration, postFunc func()) { - go func() { + p.t.Go(func() error { defer p.cleanup(postFunc) timer := time.NewTimer(d) for { select { - case <-p.done: + case <-p.t.Dying(): p.logger.Info("Test run stopped. Exiting.") - return + return nil case <-p.graceful: p.logger.Info("Test run aborted. Exiting.") - return + p.t.Kill(nil) + return nil case <-timer.C: p.logger.Info("Test run completed. Exiting.") - return + p.t.Kill(nil) + return nil case p.ch <- newHeartBeat(): } } - }() -} - -func (p *Pump) Stop() { - p.logger.Debug("pump asked to stop") - p.done <- struct{}{} + }) } func (p *Pump) cleanup(postFunc func()) { @@ -61,23 +60,26 @@ func (p *Pump) cleanup(postFunc func()) { postFunc() } -func createPump(sz int, logger *zap.Logger) *Pump { +func createPump(t *tomb.Tomb, sz int, logger *zap.Logger) *Pump { logger = logger.Named("pump") var graceful = make(chan os.Signal, 1) signal.Notify(graceful, syscall.SIGTERM, syscall.SIGINT) pump := &Pump{ ch: make(chan heartBeat, sz), - done: make(chan struct{}, 1), + t: t, graceful: graceful, logger: logger, } return pump } -func createPumpCallback(result chan Status, sp *spinner.Spinner) func() { +func createPumpCallback(generators []*gemini.Generator, result chan Status, sp *spinner.Spinner) func() { return func() { if sp != nil { sp.Stop() } + for _, g := range generators { + g.Stop() + } } } diff --git a/cmd/gemini/root.go b/cmd/gemini/root.go index b323afa3..befc590d 100644 --- a/cmd/gemini/root.go +++ b/cmd/gemini/root.go @@ -11,7 +11,6 @@ import ( _ "net/http/pprof" "os" "strings" - "sync" "text/tabwriter" "time" @@ -28,6 +27,7 @@ import ( "golang.org/x/exp/rand" "golang.org/x/net/context" "gonum.org/v1/gonum/stat/distuv" + "gopkg.in/tomb.v2" ) var ( @@ -88,7 +88,7 @@ func interactive() bool { return !nonInteractive } -type testJob func(context.Context, <-chan heartBeat, *sync.WaitGroup, *gemini.Schema, gemini.SchemaConfig, *gemini.Table, store.Store, *rand.Rand, gemini.PartitionRangeConfig, *gemini.Generator, chan Status, string, time.Duration, *zap.Logger) +type testJob func(context.Context, <-chan heartBeat, *gemini.Schema, gemini.SchemaConfig, *gemini.Table, store.Store, *rand.Rand, gemini.PartitionRangeConfig, *gemini.Generator, chan Status, string, time.Duration, *zap.Logger) func readSchema(confFile string) (*gemini.Schema, error) { byteValue, err := ioutil.ReadFile(confFile) @@ -205,30 +205,25 @@ func run(cmd *cobra.Command, args []string) error { } } - done := &sync.WaitGroup{} - done.Add(1) + t := &tomb.Tomb{} result := make(chan Status, 10000) endResult := make(chan Status, 1) - pump := createPump(10000, logger) + pump := createPump(t, 10000, logger) generators := createGenerators(schema, schemaConfig, distFunc, concurrency, distributionSize, logger) - go func() { - defer func() { - for _, g := range generators { - g.Stop() - } - }() - defer done.Done() + t.Go(func() error { var sp *spinner.Spinner = nil if interactive() { sp = createSpinner() } - pump.Start(duration+warmup, createPumpCallback(result, sp)) - endResult <- sampleStatus(pump, result, sp, logger) - }() + pump.Start(duration+warmup, createPumpCallback(generators, result, sp)) + endResult <- sampleStatus(result, sp, logger) + return nil + }) launch(schema, schemaConfig, store, pump, generators, result, logger) close(result) - done.Wait() + logger.Info("result channel closed") + _ = t.Wait() res := <-endResult res.PrintResult(outFile, schema) if res.HasErrors() { @@ -280,29 +275,56 @@ func createDistributionFunc(distribution string, size, seed uint64, mu, sigma fl } func launch(schema *gemini.Schema, schemaConfig gemini.SchemaConfig, store store.Store, pump *Pump, generators []*gemini.Generator, result chan Status, logger *zap.Logger) { - if warmup > 0 { - done := &sync.WaitGroup{} - done.Add(1) - job(done, WarmupJob, concurrency, schema, schemaConfig, store, pump, generators, result, logger) - done.Wait() - logger.Info("Warmup done") + if doWarmup(schema, schemaConfig, store, pump, generators, result, logger) { + logger.Info("doWarmup terminates launch") + return } - done := &sync.WaitGroup{} - done.Add(1) + t := &tomb.Tomb{} switch mode { case writeMode: - go job(done, MutationJob, concurrency, schema, schemaConfig, store, pump, generators, result, logger) + entombJobs(t, concurrency, schema, schemaConfig, store, pump, generators, result, logger, MutationJob) case readMode: - go job(done, ValidationJob, concurrency, schema, schemaConfig, store, pump, generators, result, logger) + entombJobs(t, concurrency, schema, schemaConfig, store, pump, generators, result, logger, ValidationJob) default: - done.Add(1) - go job(done, MutationJob, concurrency, schema, schemaConfig, store, pump, generators, result, logger) - go job(done, ValidationJob, concurrency, schema, schemaConfig, store, pump, generators, result, logger) + entombJobs(t, concurrency, schema, schemaConfig, store, pump, generators, result, logger, MutationJob, ValidationJob) } - done.Wait() + _ = t.Wait() logger.Info("All jobs complete") } +func doWarmup(schema *gemini.Schema, schemaConfig gemini.SchemaConfig, s store.Store, pump *Pump, generators []*gemini.Generator, result chan Status, logger *zap.Logger) bool { + if warmup > 0 { + t := &tomb.Tomb{} + entombJobs(t, concurrency, schema, schemaConfig, s, pump, generators, result, logger, WarmupJob) + _ = t.Wait() + logger.Info("Warmup done") + select { + case <-pump.t.Dying(): + logger.Info("Warmup dying") + return true + default: + logger.Info("Warmup not dying") + } + } + return false +} + +func wrapJobInTombFunc(t *tomb.Tomb, f testJob, actors uint64, schema *gemini.Schema, schemaConfig gemini.SchemaConfig, s store.Store, pump *Pump, generators []*gemini.Generator, result chan Status, logger *zap.Logger) func() error { + return func() error { + job(t, f, concurrency, schema, schemaConfig, s, pump, generators, result, logger) + return nil + } +} + +func entombJobs(t *tomb.Tomb, actors uint64, schema *gemini.Schema, schemaConfig gemini.SchemaConfig, s store.Store, pump *Pump, generators []*gemini.Generator, result chan Status, logger *zap.Logger, fs ...testJob) { + t.Go(func() error { + for _, f := range fs { + t.Go(wrapJobInTombFunc(t, f, actors, schema, schemaConfig, s, pump, generators, result, logger)) + } + return nil + }) +} + func createLogger(level string) *zap.Logger { lvl := zap.NewAtomicLevel() if err := lvl.UnmarshalText([]byte(level)); err != nil { diff --git a/cmd/gemini/status.go b/cmd/gemini/status.go index 96055ed0..3cd633b5 100644 --- a/cmd/gemini/status.go +++ b/cmd/gemini/status.go @@ -70,7 +70,7 @@ func (r Status) HasErrors() bool { return r.WriteErrors > 0 || r.ReadErrors > 0 } -func sampleStatus(p *Pump, c chan Status, sp *spinner.Spinner, logger *zap.Logger) Status { +func sampleStatus(c chan Status, sp *spinner.Spinner, logger *zap.Logger) Status { failfastDone := sync.Once{} logger = logger.Named("sample_results") var testRes Status @@ -83,8 +83,8 @@ func sampleStatus(p *Pump, c chan Status, sp *spinner.Spinner, logger *zap.Logge if failFast { failfastDone.Do(func() { logger.Warn("Errors detected. Exiting.") - p.Stop() }) + return testRes } } } diff --git a/generator.go b/generator.go index 1acd6233..ccea7ec3 100644 --- a/generator.go +++ b/generator.go @@ -1,13 +1,11 @@ package gemini import ( - "sync" - "github.com/scylladb/gemini/inflight" - "github.com/scylladb/gemini/murmur" "go.uber.org/zap" "golang.org/x/exp/rand" + "gopkg.in/tomb.v2" ) // TokenIndex represents the position of a token in the token ring. @@ -32,7 +30,7 @@ type Generator struct { partitionsConfig PartitionRangeConfig seed uint64 idxFunc DistributionFunc - doneCh chan struct{} + t *tomb.Tomb logger *zap.Logger } @@ -46,13 +44,13 @@ type GeneratorConfig struct { } func NewGenerator(table *Table, config *GeneratorConfig, logger *zap.Logger) *Generator { + t := &tomb.Tomb{} sources := make([]*Source, config.Size) for i := uint64(0); i < config.Size; i++ { - done := &sync.WaitGroup{} - done.Add(1) sources[i] = &Source{ values: make(chan ValueWithToken, config.PkUsedBufferSize), oldValues: make(chan ValueWithToken, config.PkUsedBufferSize), + t: t, } } gs := &Generator{ @@ -64,7 +62,7 @@ func NewGenerator(table *Table, config *GeneratorConfig, logger *zap.Logger) *Ge partitionsConfig: config.Partitions, seed: config.Seed, idxFunc: config.DistributionFunc, - doneCh: make(chan struct{}, 1), + t: t, logger: logger, } gs.start() @@ -72,6 +70,11 @@ func NewGenerator(table *Table, config *GeneratorConfig, logger *zap.Logger) *Ge } func (g Generator) Get() (ValueWithToken, bool) { + select { + case <-g.t.Dying(): + return emptyValueWithToken, false + default: + } source := g.sources[uint64(g.idxFunc())%g.size] for { v := source.pick() @@ -84,6 +87,11 @@ func (g Generator) Get() (ValueWithToken, bool) { // GetOld returns a previously used value and token or a new if // the old queue is empty. func (g Generator) GetOld() (ValueWithToken, bool) { + select { + case <-g.t.Dying(): + return emptyValueWithToken, false + default: + } return g.sources[uint64(g.idxFunc())%g.size].getOld() } @@ -96,6 +104,11 @@ type ValueWithToken struct { // is empty in which case it removes the corresponding token from the // in-flight tracking. func (g *Generator) GiveOld(v ValueWithToken) { + select { + case <-g.t.Dying(): + return + default: + } source := g.sources[v.Token%g.size] if len(v.Value) == 0 { g.inFlight.Delete(v.Token) @@ -105,14 +118,12 @@ func (g *Generator) GiveOld(v ValueWithToken) { } func (g *Generator) Stop() { - g.doneCh <- struct{}{} - for _, s := range g.sources { - close(s.oldValues) - } + g.t.Kill(nil) + _ = g.t.Wait() } func (g *Generator) start() { - go func() { + g.t.Go(func() error { g.logger.Info("starting partition key generation loop") routingKeyCreator := &RoutingKeyCreator{} r := rand.New(rand.NewSource(g.seed)) @@ -123,13 +134,13 @@ func (g *Generator) start() { source := g.sources[idx] select { case source.values <- ValueWithToken{Token: hash, Value: values}: - case <-g.doneCh: + case <-g.t.Dying(): g.logger.Info("stopping partition key generation loop") - return + return nil default: } } - }() + }) } func (g *Generator) createPartitionKeyValues(r *rand.Rand) []interface{} { diff --git a/go.mod b/go.mod index 11c5c441..168c8e38 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( golang.org/x/sys v0.0.0-20190412213103-97732733099d // indirect gonum.org/v1/gonum v0.0.0-20190724133715-a8659125a966 gopkg.in/inf.v0 v0.9.1 + gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 ) replace github.com/gocql/gocql => github.com/scylladb/gocql v1.2.0 diff --git a/go.sum b/go.sum index fbccf8ff..3b23b494 100644 --- a/go.sum +++ b/go.sum @@ -144,5 +144,7 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 h1:yiW+nvdHb9LVqSHQBXfZCieqV4fzYhNBql77zY0ykqs= +gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637/go.mod h1:BHsqpu/nsuzkT5BpiH1EMZPLyqSMM8JbIavyFACoFNk= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= diff --git a/source.go b/source.go index 0812cfa1..d1cf44c4 100644 --- a/source.go +++ b/source.go @@ -1,8 +1,11 @@ package gemini +import "gopkg.in/tomb.v2" + type Source struct { values chan ValueWithToken oldValues chan ValueWithToken + t *tomb.Tomb } //get returns a new value and ensures that it's corresponding token @@ -11,16 +14,16 @@ func (s *Source) get() (ValueWithToken, bool) { return s.pick(), true } +var emptyValueWithToken = ValueWithToken{} + //getOld returns a previously used value and token or a new if //the old queue is empty. func (s *Source) getOld() (ValueWithToken, bool) { select { + case <-s.t.Dying(): + return emptyValueWithToken, false case v, ok := <-s.oldValues: return v, ok - /*default: - // There are no old values so we generate a new - return s.get() - */ } } From 0cc70d5284554a9af2dbc8b8f006a0af336fb135 Mon Sep 17 00:00:00 2001 From: Henrik Johansson Date: Thu, 5 Sep 2019 10:18:05 +0200 Subject: [PATCH 12/17] gemini: using a simple locked inflight cache --- generator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/generator.go b/generator.go index ccea7ec3..194f23a3 100644 --- a/generator.go +++ b/generator.go @@ -55,7 +55,7 @@ func NewGenerator(table *Table, config *GeneratorConfig, logger *zap.Logger) *Ge } gs := &Generator{ sources: sources, - inFlight: inflight.NewConcurrent(), + inFlight: inflight.New(), size: config.Size, distributionSize: config.DistributionSize, table: table, From 1fc670c70788162f4770d94cc0aad8fd1ee1a7a6 Mon Sep 17 00:00:00 2001 From: Henrik Johansson Date: Thu, 5 Sep 2019 13:25:33 +0200 Subject: [PATCH 13/17] gemini: renamed source to partition --- cmd/gemini/generators.go | 11 ++++---- cmd/gemini/jobs.go | 24 ++++++++--------- cmd/gemini/root.go | 10 +++---- docs/architecture.md | 2 +- generator.go | 57 +++++++++++++++++++++------------------ generator_test.go | 12 ++++----- source.go => partition.go | 20 +++++++------- schema.go | 56 +++++++++++++++++++------------------- 8 files changed, 98 insertions(+), 94 deletions(-) rename source.go => partition.go (54%) diff --git a/cmd/gemini/generators.go b/cmd/gemini/generators.go index bb5f1ae9..ea343a06 100644 --- a/cmd/gemini/generators.go +++ b/cmd/gemini/generators.go @@ -16,12 +16,11 @@ func createGenerators(schema *gemini.Schema, schemaConfig gemini.SchemaConfig, d var gs []*gemini.Generator for _, table := range schema.Tables { gCfg := &gemini.GeneratorConfig{ - Partitions: partitionRangeConfig, - Size: actors, - DistributionSize: distributionSize, - DistributionFunc: distributionFunc, - Seed: seed, - PkUsedBufferSize: pkBufferReuseSize, + PartitionsRangeConfig: partitionRangeConfig, + PartitionsCount: distributionSize, + PartitionsDistributionFunc: distributionFunc, + Seed: seed, + PkUsedBufferSize: pkBufferReuseSize, } g := gemini.NewGenerator(table, gCfg, logger.Named("generator")) gs = append(gs, g) diff --git a/cmd/gemini/jobs.go b/cmd/gemini/jobs.go index 6376c59c..e93f7e4c 100644 --- a/cmd/gemini/jobs.go +++ b/cmd/gemini/jobs.go @@ -15,7 +15,7 @@ import ( // MutationJob continuously applies mutations against the database // for as long as the pump is active. -func MutationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) { +func MutationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, g *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) { schemaConfig := &schemaCfg logger = logger.Named("mutation_job") testStatus := Status{} @@ -30,7 +30,7 @@ func MutationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Sche if ind%100000 == 0 { ddl(ctx, schema, schemaConfig, table, s, r, p, &testStatus, logger) } else { - mutation(ctx, schema, schemaConfig, table, s, r, p, source, &testStatus, true, logger) + mutation(ctx, schema, schemaConfig, table, s, r, p, g, &testStatus, true, logger) } if i%1000 == 0 { c <- testStatus @@ -46,7 +46,7 @@ func MutationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Sche // ValidationJob continuously applies validations against the database // for as long as the pump is active. -func ValidationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) { +func ValidationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, g *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) { schemaConfig := &schemaCfg logger = logger.Named("validation_job") @@ -57,7 +57,7 @@ func ValidationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Sc var i int for hb := range pump { hb.await() - validation(ctx, schema, schemaConfig, table, s, r, p, source, &testStatus, logger) + validation(ctx, schema, schemaConfig, table, s, r, p, g, &testStatus, logger) if i%1000 == 0 { c <- testStatus testStatus = Status{} @@ -71,7 +71,7 @@ func ValidationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Sc // WarmupJob continuously applies mutations against the database // for as long as the pump is active or the supplied duration expires. -func WarmupJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) { +func WarmupJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, g *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) { schemaConfig := &schemaCfg testStatus := Status{} var i int @@ -90,7 +90,7 @@ func WarmupJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema c <- testStatus return default: - mutation(ctx, schema, schemaConfig, table, s, r, p, source, &testStatus, false, logger) + mutation(ctx, schema, schemaConfig, table, s, r, p, g, &testStatus, false, logger) if i%1000 == 0 { c <- testStatus testStatus = Status{} @@ -165,8 +165,8 @@ func ddl(ctx context.Context, schema *gemini.Schema, sc *gemini.SchemaConfig, ta } } -func mutation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, testStatus *Status, deletes bool, logger *zap.Logger) { - mutateStmt, err := schema.GenMutateStmt(table, source, r, p, deletes) +func mutation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, g *gemini.Generator, testStatus *Status, deletes bool, logger *zap.Logger) { + mutateStmt, err := schema.GenMutateStmt(table, g, r, p, deletes) if err != nil { logger.Error("Failed! Mutation statement generation failed", zap.Error(err)) testStatus.WriteErrors++ @@ -183,7 +183,7 @@ func mutation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig defer func() { v := make(gemini.Value, len(table.PartitionKeys)) copy(v, mutateValues) - source.GiveOld(gemini.ValueWithToken{Token: token, Value: v}) + g.GiveOld(gemini.ValueWithToken{Token: token, Value: v}) }() if w := logger.Check(zap.DebugLevel, "validation statement"); w != nil { w.Write(zap.String("pretty_cql", mutateStmt.PrettyCQL())) @@ -201,8 +201,8 @@ func mutation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig } } -func validation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, testStatus *Status, logger *zap.Logger) { - checkStmt := schema.GenCheckStmt(table, source, r, p) +func validation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, g *gemini.Generator, testStatus *Status, logger *zap.Logger) { + checkStmt := schema.GenCheckStmt(table, g, r, p) if checkStmt == nil { if w := logger.Check(zap.DebugLevel, "no statement generated"); w != nil { w.Write(zap.String("job", "validation")) @@ -213,7 +213,7 @@ func validation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConf token, checkValues := checkStmt.Values() defer func() { // Signal done with this pk... - source.GiveOld(gemini.ValueWithToken{Token: token}) + g.GiveOld(gemini.ValueWithToken{Token: token}) }() if w := logger.Check(zap.DebugLevel, "validation statement"); w != nil { w.Write(zap.String("pretty_cql", checkStmt.PrettyCQL())) diff --git a/cmd/gemini/root.go b/cmd/gemini/root.go index befc590d..9da0624a 100644 --- a/cmd/gemini/root.go +++ b/cmd/gemini/root.go @@ -60,7 +60,7 @@ var ( maxRetriesMutate int maxRetriesMutateSleep time.Duration pkBufferReuseSize uint64 - distributionSize uint64 + partitionCount uint64 partitionKeyDistribution string normalDistMean float64 normalDistSigma float64 @@ -137,7 +137,7 @@ func run(cmd *cobra.Command, args []string) error { if err := printSetup(); err != nil { return errors.Wrapf(err, "unable to print setup") } - distFunc, err := createDistributionFunc(partitionKeyDistribution, distributionSize, seed, stdDistMean, oneStdDev) + distFunc, err := createDistributionFunc(partitionKeyDistribution, math.MaxUint64, seed, stdDistMean, oneStdDev) if err != nil { return err } @@ -209,7 +209,7 @@ func run(cmd *cobra.Command, args []string) error { result := make(chan Status, 10000) endResult := make(chan Status, 1) pump := createPump(t, 10000, logger) - generators := createGenerators(schema, schemaConfig, distFunc, concurrency, distributionSize, logger) + generators := createGenerators(schema, schemaConfig, distFunc, concurrency, partitionCount, logger) t.Go(func() error { var sp *spinner.Spinner = nil if interactive() { @@ -445,8 +445,8 @@ func init() { rootCmd.Flags().StringVarP(&level, "level", "", "info", "Specify the logging level, debug|info|warn|error|dpanic|panic|fatal") rootCmd.Flags().IntVarP(&maxRetriesMutate, "max-mutation-retries", "", 2, "Maximum number of attempts to apply a mutation") rootCmd.Flags().DurationVarP(&maxRetriesMutateSleep, "max-mutation-retries-backoff", "", 10*time.Millisecond, "Duration between attempts to apply a mutation for example 10ms or 1s") - rootCmd.Flags().Uint64VarP(&pkBufferReuseSize, "partition-key-buffer-reuse-size", "", 2000, "Number of reused buffered partition keys") - rootCmd.Flags().Uint64VarP(&distributionSize, "distribution-size", "", math.MaxUint64, "Number of partition keys each worker creates") + rootCmd.Flags().Uint64VarP(&pkBufferReuseSize, "partition-key-buffer-reuse-size", "", 100, "Number of reused buffered partition keys") + rootCmd.Flags().Uint64VarP(&partitionCount, "token-range-slices", "", 10000, "Number of slices to divide the token space into") rootCmd.Flags().StringVarP(&partitionKeyDistribution, "partition-key-distribution", "", "uniform", "Specify the distribution from which to draw partition keys, supported values are currently uniform|normal|zipf") rootCmd.Flags().Float64VarP(&normalDistMean, "normal-dist-mean", "", stdDistMean, "Mean of the normal distribution") rootCmd.Flags().Float64VarP(&normalDistSigma, "normal-dist-sigma", "", oneStdDev, "Sigma of the normal distribution, defaults to one standard deviation ~0.341") diff --git a/docs/architecture.md b/docs/architecture.md index ed70d626..63c028cc 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -55,7 +55,7 @@ in use but the idea is to introduce some jitter into the execution flow. The application generates partition ids through a `Generator` that creates a steady flow of partition key components for the desired [concurrency](architecture.md#Concurrency). -Each goroutine is connected to a `source` that the generator controls. This source continuously emits +Each goroutine is connected to a `partition` that the generator controls. This partition continuously emits new partition ids in the form of a `[]interface{}`. These keys are created in the same way as the the driver does to ensure that each goroutine only processes partition keys from it's designated bucket. These partition keys These values are copied into another list that keeps the old partition ids for diff --git a/generator.go b/generator.go index 194f23a3..823321b9 100644 --- a/generator.go +++ b/generator.go @@ -22,10 +22,9 @@ type TokenIndex uint64 type DistributionFunc func() TokenIndex type Generator struct { - sources []*Source + partitions []*Partition inFlight inflight.InFlight - size uint64 - distributionSize uint64 + partitionCount uint64 table *Table partitionsConfig PartitionRangeConfig seed uint64 @@ -35,33 +34,31 @@ type Generator struct { } type GeneratorConfig struct { - Partitions PartitionRangeConfig - DistributionSize uint64 - DistributionFunc DistributionFunc - Size uint64 - Seed uint64 - PkUsedBufferSize uint64 + PartitionsRangeConfig PartitionRangeConfig + PartitionsCount uint64 + PartitionsDistributionFunc DistributionFunc + Seed uint64 + PkUsedBufferSize uint64 } func NewGenerator(table *Table, config *GeneratorConfig, logger *zap.Logger) *Generator { t := &tomb.Tomb{} - sources := make([]*Source, config.Size) - for i := uint64(0); i < config.Size; i++ { - sources[i] = &Source{ + partitions := make([]*Partition, config.PartitionsCount) + for i := 0; i < len(partitions); i++ { + partitions[i] = &Partition{ values: make(chan ValueWithToken, config.PkUsedBufferSize), oldValues: make(chan ValueWithToken, config.PkUsedBufferSize), t: t, } } gs := &Generator{ - sources: sources, + partitions: partitions, inFlight: inflight.New(), - size: config.Size, - distributionSize: config.DistributionSize, + partitionCount: config.PartitionsCount, table: table, - partitionsConfig: config.Partitions, + partitionsConfig: config.PartitionsRangeConfig, seed: config.Seed, - idxFunc: config.DistributionFunc, + idxFunc: config.PartitionsDistributionFunc, t: t, logger: logger, } @@ -75,9 +72,9 @@ func (g Generator) Get() (ValueWithToken, bool) { return emptyValueWithToken, false default: } - source := g.sources[uint64(g.idxFunc())%g.size] + partition := g.partitions[uint64(g.idxFunc())%g.partitionCount] for { - v := source.pick() + v := partition.pick() if g.inFlight.AddIfNotPresent(v.Token) { return v, true } @@ -92,7 +89,7 @@ func (g Generator) GetOld() (ValueWithToken, bool) { return emptyValueWithToken, false default: } - return g.sources[uint64(g.idxFunc())%g.size].getOld() + return g.partitions[uint64(g.idxFunc())%g.partitionCount].getOld() } type ValueWithToken struct { @@ -109,12 +106,12 @@ func (g *Generator) GiveOld(v ValueWithToken) { return default: } - source := g.sources[v.Token%g.size] + partition := g.partitions[v.Token%g.partitionCount] if len(v.Value) == 0 { g.inFlight.Delete(v.Token) return } - source.giveOld(v) + partition.giveOld(v) } func (g *Generator) Stop() { @@ -127,15 +124,23 @@ func (g *Generator) start() { g.logger.Info("starting partition key generation loop") routingKeyCreator := &RoutingKeyCreator{} r := rand.New(rand.NewSource(g.seed)) + var ( + cntCreated uint64 + cntEmitted uint64 + ) for { values := g.createPartitionKeyValues(r) hash := hash(routingKeyCreator, g.table, values) - idx := hash % g.size - source := g.sources[idx] + idx := hash % g.partitionCount + partition := g.partitions[idx] + cntCreated++ select { - case source.values <- ValueWithToken{Token: hash, Value: values}: + case partition.values <- ValueWithToken{Token: hash, Value: values}: + cntEmitted++ case <-g.t.Dying(): - g.logger.Info("stopping partition key generation loop") + g.logger.Info("stopping partition key generation loop", + zap.Uint64("keys_created", cntCreated), + zap.Uint64("keys_emitted", cntEmitted)) return nil default: } diff --git a/generator_test.go b/generator_test.go index 6989e5a9..adc5f8f5 100644 --- a/generator_test.go +++ b/generator_test.go @@ -14,7 +14,7 @@ func TestGenerator(t *testing.T) { } var current uint64 cfg := &GeneratorConfig{ - Partitions: PartitionRangeConfig{ + PartitionsRangeConfig: PartitionRangeConfig{ MaxStringLength: 10, MinStringLength: 0, MaxBlobLength: 10, @@ -22,18 +22,18 @@ func TestGenerator(t *testing.T) { }, Size: 10000, PkUsedBufferSize: 10000, - DistributionSize: 1000, - DistributionFunc: func() uint64 { - return atomic.LoadUint64(¤t) + PartitionsCount: 1000, + PartitionsDistributionFunc: func() TokenIndex { + return TokenIndex(atomic.LoadUint64(¤t)) }, } logger, _ := zap.NewDevelopment() generators := NewGenerator(table, cfg, logger) - for i := uint64(0); i < cfg.DistributionSize; i++ { + for i := uint64(0); i < cfg.PartitionsCount; i++ { atomic.StoreUint64(¤t, i) v, _ := generators.Get() n, _ := generators.Get() - if v.Token%generators.size != n.Token%generators.size { + if v.Token%generators.partitionCount != n.Token%generators.partitionCount { t.Errorf("expected %v, got %v", v, n) } } diff --git a/source.go b/partition.go similarity index 54% rename from source.go rename to partition.go index d1cf44c4..be713c5e 100644 --- a/source.go +++ b/partition.go @@ -2,23 +2,23 @@ package gemini import "gopkg.in/tomb.v2" -type Source struct { +type Partition struct { values chan ValueWithToken oldValues chan ValueWithToken t *tomb.Tomb } -//get returns a new value and ensures that it's corresponding token -//is not already in-flight. -func (s *Source) get() (ValueWithToken, bool) { +// get returns a new value and ensures that it's corresponding token +// is not already in-flight. +func (s *Partition) get() (ValueWithToken, bool) { return s.pick(), true } var emptyValueWithToken = ValueWithToken{} -//getOld returns a previously used value and token or a new if -//the old queue is empty. -func (s *Source) getOld() (ValueWithToken, bool) { +// getOld returns a previously used value and token or a new if +// the old queue is empty. +func (s *Partition) getOld() (ValueWithToken, bool) { select { case <-s.t.Dying(): return emptyValueWithToken, false @@ -30,14 +30,14 @@ func (s *Source) getOld() (ValueWithToken, bool) { // giveOld returns the supplied value for later reuse unless the value // is empty in which case it removes the corresponding token from the // in-flight tracking. -func (s *Source) giveOld(v ValueWithToken) { +func (s *Partition) giveOld(v ValueWithToken) { select { case s.oldValues <- v: default: - // Old source is full, just drop the value + // Old partition buffer is full, just drop the value } } -func (s *Source) pick() ValueWithToken { +func (s *Partition) pick() ValueWithToken { return <-s.values } diff --git a/schema.go b/schema.go index 51237d14..12b569a2 100644 --- a/schema.go +++ b/schema.go @@ -536,7 +536,7 @@ func (s *Schema) GetCreateSchema() []string { return stmts } -func (s *Schema) GenInsertStmt(t *Table, source *Generator, r *rand.Rand, p PartitionRangeConfig) (*Stmt, error) { +func (s *Schema) GenInsertStmt(t *Table, g *Generator, r *rand.Rand, p PartitionRangeConfig) (*Stmt, error) { t.mu.RLock() defer t.mu.RUnlock() @@ -549,7 +549,7 @@ func (s *Schema) GenInsertStmt(t *Table, source *Generator, r *rand.Rand, p Part typs = append(typs, pk.Type) } - valuesWithToken, ok := source.Get() + valuesWithToken, ok := g.Get() if !ok { return nil, nil } @@ -578,11 +578,11 @@ func (s *Schema) GenInsertStmt(t *Table, source *Generator, r *rand.Rand, p Part }, nil } -func (s *Schema) GenInsertJsonStmt(t *Table, source *Generator, r *rand.Rand, p PartitionRangeConfig) (*Stmt, error) { +func (s *Schema) GenInsertJsonStmt(t *Table, g *Generator, r *rand.Rand, p PartitionRangeConfig) (*Stmt, error) { t.mu.RLock() defer t.mu.RUnlock() - vals, ok := source.Get() + vals, ok := g.Get() if !ok { return nil, nil } @@ -636,7 +636,7 @@ func (s *Schema) GenInsertJsonStmt(t *Table, source *Generator, r *rand.Rand, p }, nil } -func (s *Schema) GenDeleteRows(t *Table, source *Generator, r *rand.Rand, p PartitionRangeConfig) (*Stmt, error) { +func (s *Schema) GenDeleteRows(t *Table, g *Generator, r *rand.Rand, p PartitionRangeConfig) (*Stmt, error) { t.mu.RLock() defer t.mu.RUnlock() @@ -649,7 +649,7 @@ func (s *Schema) GenDeleteRows(t *Table, source *Generator, r *rand.Rand, p Part typs = append(typs, pk.Type) } - vs, ok := source.Get() + vs, ok := g.Get() if !ok { return nil, nil } @@ -681,30 +681,30 @@ func (s *Schema) GenDDLStmt(t *Table, r *rand.Rand, p PartitionRangeConfig, sc * } } -func (s *Schema) GenMutateStmt(t *Table, source *Generator, r *rand.Rand, p PartitionRangeConfig, deletes bool) (*Stmt, error) { +func (s *Schema) GenMutateStmt(t *Table, g *Generator, r *rand.Rand, p PartitionRangeConfig, deletes bool) (*Stmt, error) { t.mu.RLock() defer t.mu.RUnlock() if !deletes { - return s.GenInsertStmt(t, source, r, p) + return s.GenInsertStmt(t, g, r, p) } switch n := rand.Intn(1000); n { case 10, 100: - return s.GenDeleteRows(t, source, r, p) + return s.GenDeleteRows(t, g, r, p) default: switch n := rand.Intn(2); n { case 0: if t.KnownIssues[KnownIssuesJsonWithTuples] { - return s.GenInsertStmt(t, source, r, p) + return s.GenInsertStmt(t, g, r, p) } - return s.GenInsertJsonStmt(t, source, r, p) + return s.GenInsertJsonStmt(t, g, r, p) default: - return s.GenInsertStmt(t, source, r, p) + return s.GenInsertStmt(t, g, r, p) } } } -func (s *Schema) GenCheckStmt(t *Table, source *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { +func (s *Schema) GenCheckStmt(t *Table, g *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { var n int if len(t.Indexes) > 0 { n = r.Intn(5) @@ -713,27 +713,27 @@ func (s *Schema) GenCheckStmt(t *Table, source *Generator, r *rand.Rand, p Parti } switch n { case 0: - return s.genSinglePartitionQuery(t, source, r, p) + return s.genSinglePartitionQuery(t, g, r, p) case 1: - return s.genMultiplePartitionQuery(t, source, r, p) + return s.genMultiplePartitionQuery(t, g, r, p) case 2: - return s.genClusteringRangeQuery(t, source, r, p) + return s.genClusteringRangeQuery(t, g, r, p) case 3: - return s.genMultiplePartitionClusteringRangeQuery(t, source, r, p) + return s.genMultiplePartitionClusteringRangeQuery(t, g, r, p) case 4: // Reducing the probability to hit these since they often take a long time to run n := r.Intn(5) switch n { case 0: - return s.genSingleIndexQuery(t, source, r, p) + return s.genSingleIndexQuery(t, g, r, p) default: - return s.genSinglePartitionQuery(t, source, r, p) + return s.genSinglePartitionQuery(t, g, r, p) } } return nil } -func (s *Schema) genSinglePartitionQuery(t *Table, source *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { +func (s *Schema) genSinglePartitionQuery(t *Table, g *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { t.mu.RLock() defer t.mu.RUnlock() @@ -752,7 +752,7 @@ func (s *Schema) genSinglePartitionQuery(t *Table, source *Generator, r *rand.Ra builder = builder.Where(qb.Eq(pk.Name)) typs = append(typs, pk.Type) } - values, ok := source.GetOld() + values, ok := g.GetOld() if !ok { return nil } @@ -765,7 +765,7 @@ func (s *Schema) genSinglePartitionQuery(t *Table, source *Generator, r *rand.Ra } } -func (s *Schema) genMultiplePartitionQuery(t *Table, source *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { +func (s *Schema) genMultiplePartitionQuery(t *Table, g *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { t.mu.RLock() defer t.mu.RUnlock() @@ -791,7 +791,7 @@ func (s *Schema) genMultiplePartitionQuery(t *Table, source *Generator, r *rand. for i, pk := range partitionKeys { builder = builder.Where(qb.InTuple(pk.Name, pkNum)) for j := 0; j < pkNum; j++ { - vs, ok := source.GetOld() + vs, ok := g.GetOld() if !ok { return nil } @@ -808,7 +808,7 @@ func (s *Schema) genMultiplePartitionQuery(t *Table, source *Generator, r *rand. } } -func (s *Schema) genClusteringRangeQuery(t *Table, source *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { +func (s *Schema) genClusteringRangeQuery(t *Table, g *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { t.mu.RLock() defer t.mu.RUnlock() @@ -827,7 +827,7 @@ func (s *Schema) genClusteringRangeQuery(t *Table, source *Generator, r *rand.Ra clusteringKeys = t.MaterializedViews[view].ClusteringKeys }*/ builder := qb.Select(s.Keyspace.Name + "." + tableName) - vs, ok := source.GetOld() + vs, ok := g.GetOld() if !ok { // Done or no values available... return nil @@ -858,7 +858,7 @@ func (s *Schema) genClusteringRangeQuery(t *Table, source *Generator, r *rand.Ra } } -func (s *Schema) genMultiplePartitionClusteringRangeQuery(t *Table, source *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { +func (s *Schema) genMultiplePartitionClusteringRangeQuery(t *Table, g *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { t.mu.RLock() defer t.mu.RUnlock() @@ -885,7 +885,7 @@ func (s *Schema) genMultiplePartitionClusteringRangeQuery(t *Table, source *Gene for i, pk := range partitionKeys { builder = builder.Where(qb.InTuple(pk.Name, pkNum)) for j := 0; j < pkNum; j++ { - vs, ok := source.GetOld() + vs, ok := g.GetOld() if !ok { return nil } @@ -914,7 +914,7 @@ func (s *Schema) genMultiplePartitionClusteringRangeQuery(t *Table, source *Gene } } -func (s *Schema) genSingleIndexQuery(t *Table, source *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { +func (s *Schema) genSingleIndexQuery(t *Table, g *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { t.mu.RLock() defer t.mu.RUnlock() From c548d3adbc1bec6e859d7f84eae6750669787452 Mon Sep 17 00:00:00 2001 From: Henrik Johansson Date: Thu, 5 Sep 2019 14:47:34 +0200 Subject: [PATCH 14/17] inflight: simplified deletes --- inflight/inflight.go | 23 ++++++++--------------- inflight/inflight_test.go | 18 ++++++++---------- 2 files changed, 16 insertions(+), 25 deletions(-) diff --git a/inflight/inflight.go b/inflight/inflight.go index fb97c4e1..bd8f1c26 100644 --- a/inflight/inflight.go +++ b/inflight/inflight.go @@ -8,7 +8,7 @@ import ( type InFlight interface { AddIfNotPresent(uint64) bool - Delete(uint64) bool + Delete(uint64) } // New creates a instance of a simple InFlight set. @@ -45,9 +45,9 @@ type shardedSyncU64set struct { shards [256]*syncU64set } -func (s *shardedSyncU64set) Delete(v uint64) bool { +func (s *shardedSyncU64set) Delete(v uint64) { ss := s.shards[v%256] - return ss.Delete(v) + ss.Delete(v) } func (s *shardedSyncU64set) AddIfNotPresent(v uint64) bool { @@ -61,21 +61,10 @@ type syncU64set struct { mu *sync.RWMutex } -func (s *syncU64set) Delete(v uint64) bool { - s.mu.RLock() - if !s.pks.Has(v) { - s.mu.RUnlock() - return false - } - s.mu.RUnlock() +func (s *syncU64set) Delete(v uint64) { s.mu.Lock() defer s.mu.Unlock() - if !s.pks.Has(v) { - // double check - return false - } s.pks.Remove(v) - return true } func (s *syncU64set) AddIfNotPresent(v uint64) bool { @@ -85,6 +74,10 @@ func (s *syncU64set) AddIfNotPresent(v uint64) bool { return false } s.mu.RUnlock() + return s.addIfNotPresent(v) +} + +func (s *syncU64set) addIfNotPresent(v uint64) bool { s.mu.Lock() defer s.mu.Unlock() if s.pks.Has(v) { diff --git a/inflight/inflight_test.go b/inflight/inflight_test.go index eb9c6ee5..950bf237 100644 --- a/inflight/inflight_test.go +++ b/inflight/inflight_test.go @@ -23,12 +23,10 @@ func TestDelete(t *testing.T) { flight := newSyncU64set() flight.AddIfNotPresent(10) - if !flight.Delete(10) { + flight.Delete(10) + if flight.pks.Has(10) { t.Error("did not delete the value") } - if flight.Delete(10) { - t.Error("deleted the value twice") - } } func TestAddIfNotPresentSharded(t *testing.T) { @@ -47,12 +45,10 @@ func TestDeleteSharded(t *testing.T) { flight := newShardedSyncU64set() flight.AddIfNotPresent(10) - if !flight.Delete(10) { + flight.Delete(10) + if flight.shards[10%256].pks.Has(10) { t.Error("did not delete the value") } - if flight.Delete(10) { - t.Error("deleted the value twice") - } } func TestInflight(t *testing.T) { @@ -62,7 +58,8 @@ func TestInflight(t *testing.T) { return flight.AddIfNotPresent(v) } g := func(v uint64) interface{} { - return flight.Delete(v) + flight.Delete(v) + return !flight.pks.Has(v) } cfg := createQuickConfig() @@ -78,7 +75,8 @@ func TestInflightSharded(t *testing.T) { return flight.AddIfNotPresent(v) } g := func(v uint64) interface{} { - return flight.Delete(v) + flight.Delete(v) + return !flight.shards[v%256].pks.Has(v) } cfg := createQuickConfig() From 482573dae495d7321fcc9b956d935493647ba76f Mon Sep 17 00:00:00 2001 From: Henrik Johansson Date: Thu, 5 Sep 2019 14:49:37 +0200 Subject: [PATCH 15/17] gemini: moved in-flight set to partition --- generator.go | 14 ++------------ partition.go | 17 +++++++++++++++-- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/generator.go b/generator.go index 823321b9..42d5fb12 100644 --- a/generator.go +++ b/generator.go @@ -23,7 +23,6 @@ type DistributionFunc func() TokenIndex type Generator struct { partitions []*Partition - inFlight inflight.InFlight partitionCount uint64 table *Table partitionsConfig PartitionRangeConfig @@ -48,12 +47,12 @@ func NewGenerator(table *Table, config *GeneratorConfig, logger *zap.Logger) *Ge partitions[i] = &Partition{ values: make(chan ValueWithToken, config.PkUsedBufferSize), oldValues: make(chan ValueWithToken, config.PkUsedBufferSize), + inFlight: inflight.New(), t: t, } } gs := &Generator{ partitions: partitions, - inFlight: inflight.New(), partitionCount: config.PartitionsCount, table: table, partitionsConfig: config.PartitionsRangeConfig, @@ -73,12 +72,7 @@ func (g Generator) Get() (ValueWithToken, bool) { default: } partition := g.partitions[uint64(g.idxFunc())%g.partitionCount] - for { - v := partition.pick() - if g.inFlight.AddIfNotPresent(v.Token) { - return v, true - } - } + return partition.get() } // GetOld returns a previously used value and token or a new if @@ -107,10 +101,6 @@ func (g *Generator) GiveOld(v ValueWithToken) { default: } partition := g.partitions[v.Token%g.partitionCount] - if len(v.Value) == 0 { - g.inFlight.Delete(v.Token) - return - } partition.giveOld(v) } diff --git a/partition.go b/partition.go index be713c5e..9730c4b3 100644 --- a/partition.go +++ b/partition.go @@ -1,17 +1,26 @@ package gemini -import "gopkg.in/tomb.v2" +import ( + "github.com/scylladb/gemini/inflight" + "gopkg.in/tomb.v2" +) type Partition struct { values chan ValueWithToken oldValues chan ValueWithToken + inFlight inflight.InFlight t *tomb.Tomb } // get returns a new value and ensures that it's corresponding token // is not already in-flight. func (s *Partition) get() (ValueWithToken, bool) { - return s.pick(), true + for { + v := s.pick() + if s.inFlight.AddIfNotPresent(v.Token) { + return v, true + } + } } var emptyValueWithToken = ValueWithToken{} @@ -31,6 +40,10 @@ func (s *Partition) getOld() (ValueWithToken, bool) { // is empty in which case it removes the corresponding token from the // in-flight tracking. func (s *Partition) giveOld(v ValueWithToken) { + if len(v.Value) == 0 { + s.inFlight.Delete(v.Token) + return + } select { case s.oldValues <- v: default: From 994a7528c9f67ee4eb791c6ff2b5d8eb818f2880 Mon Sep 17 00:00:00 2001 From: Henrik Johansson Date: Fri, 6 Sep 2019 10:21:38 +0200 Subject: [PATCH 16/17] gemini: picking new partition key if there is no old available --- partition.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/partition.go b/partition.go index 9730c4b3..e55f6e99 100644 --- a/partition.go +++ b/partition.go @@ -33,6 +33,8 @@ func (s *Partition) getOld() (ValueWithToken, bool) { return emptyValueWithToken, false case v, ok := <-s.oldValues: return v, ok + default: + return s.get() } } From c55b2134071149b6387f793a589ca5823ee6e234 Mon Sep 17 00:00:00 2001 From: Henrik Johansson Date: Fri, 6 Sep 2019 11:35:26 +0200 Subject: [PATCH 17/17] gemini: chengelog for lazy partition key generation --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 942b7f85..7a5bdd59 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,10 @@ # Unreleased - Lazy partition key generation reintroduced to avoid out of memory issues. + This brings in a new CLI arg `--token-range-slices` that defines how many slices the + partition keyspace should be divided into when applying the different distribution functions. + The default value of this is set to an ad-hoc value of 10000 which should supply ample possibilities + for varying selection of values according to the chosen probability distribution. - Fix overlapping operations on the same partition key ([#198](https://github.com/scylladb/gemini/issues/198)). - Partition keys can now be drawn from various distributions such as ___"zipf"___, ___"uniform"___ and ___"normal"___. The CLI argument `--partition-key-distribution` is used