diff --git a/cmd/gemini/jobs.go b/cmd/gemini/jobs.go index 6376c59c..e93f7e4c 100644 --- a/cmd/gemini/jobs.go +++ b/cmd/gemini/jobs.go @@ -15,7 +15,7 @@ import ( // MutationJob continuously applies mutations against the database // for as long as the pump is active. -func MutationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) { +func MutationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, g *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) { schemaConfig := &schemaCfg logger = logger.Named("mutation_job") testStatus := Status{} @@ -30,7 +30,7 @@ func MutationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Sche if ind%100000 == 0 { ddl(ctx, schema, schemaConfig, table, s, r, p, &testStatus, logger) } else { - mutation(ctx, schema, schemaConfig, table, s, r, p, source, &testStatus, true, logger) + mutation(ctx, schema, schemaConfig, table, s, r, p, g, &testStatus, true, logger) } if i%1000 == 0 { c <- testStatus @@ -46,7 +46,7 @@ func MutationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Sche // ValidationJob continuously applies validations against the database // for as long as the pump is active. -func ValidationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) { +func ValidationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, g *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) { schemaConfig := &schemaCfg logger = logger.Named("validation_job") @@ -57,7 +57,7 @@ func ValidationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Sc var i int for hb := range pump { hb.await() - validation(ctx, schema, schemaConfig, table, s, r, p, source, &testStatus, logger) + validation(ctx, schema, schemaConfig, table, s, r, p, g, &testStatus, logger) if i%1000 == 0 { c <- testStatus testStatus = Status{} @@ -71,7 +71,7 @@ func ValidationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Sc // WarmupJob continuously applies mutations against the database // for as long as the pump is active or the supplied duration expires. -func WarmupJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) { +func WarmupJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, g *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) { schemaConfig := &schemaCfg testStatus := Status{} var i int @@ -90,7 +90,7 @@ func WarmupJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema c <- testStatus return default: - mutation(ctx, schema, schemaConfig, table, s, r, p, source, &testStatus, false, logger) + mutation(ctx, schema, schemaConfig, table, s, r, p, g, &testStatus, false, logger) if i%1000 == 0 { c <- testStatus testStatus = Status{} @@ -165,8 +165,8 @@ func ddl(ctx context.Context, schema *gemini.Schema, sc *gemini.SchemaConfig, ta } } -func mutation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, testStatus *Status, deletes bool, logger *zap.Logger) { - mutateStmt, err := schema.GenMutateStmt(table, source, r, p, deletes) +func mutation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, g *gemini.Generator, testStatus *Status, deletes bool, logger *zap.Logger) { + mutateStmt, err := schema.GenMutateStmt(table, g, r, p, deletes) if err != nil { logger.Error("Failed! Mutation statement generation failed", zap.Error(err)) testStatus.WriteErrors++ @@ -183,7 +183,7 @@ func mutation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig defer func() { v := make(gemini.Value, len(table.PartitionKeys)) copy(v, mutateValues) - source.GiveOld(gemini.ValueWithToken{Token: token, Value: v}) + g.GiveOld(gemini.ValueWithToken{Token: token, Value: v}) }() if w := logger.Check(zap.DebugLevel, "validation statement"); w != nil { w.Write(zap.String("pretty_cql", mutateStmt.PrettyCQL())) @@ -201,8 +201,8 @@ func mutation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig } } -func validation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, testStatus *Status, logger *zap.Logger) { - checkStmt := schema.GenCheckStmt(table, source, r, p) +func validation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, g *gemini.Generator, testStatus *Status, logger *zap.Logger) { + checkStmt := schema.GenCheckStmt(table, g, r, p) if checkStmt == nil { if w := logger.Check(zap.DebugLevel, "no statement generated"); w != nil { w.Write(zap.String("job", "validation")) @@ -213,7 +213,7 @@ func validation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConf token, checkValues := checkStmt.Values() defer func() { // Signal done with this pk... - source.GiveOld(gemini.ValueWithToken{Token: token}) + g.GiveOld(gemini.ValueWithToken{Token: token}) }() if w := logger.Check(zap.DebugLevel, "validation statement"); w != nil { w.Write(zap.String("pretty_cql", checkStmt.PrettyCQL())) diff --git a/docs/architecture.md b/docs/architecture.md index ed70d626..63c028cc 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -55,7 +55,7 @@ in use but the idea is to introduce some jitter into the execution flow. The application generates partition ids through a `Generator` that creates a steady flow of partition key components for the desired [concurrency](architecture.md#Concurrency). -Each goroutine is connected to a `source` that the generator controls. This source continuously emits +Each goroutine is connected to a `partition` that the generator controls. This partition continuously emits new partition ids in the form of a `[]interface{}`. These keys are created in the same way as the the driver does to ensure that each goroutine only processes partition keys from it's designated bucket. These partition keys These values are copied into another list that keeps the old partition ids for diff --git a/generator.go b/generator.go index 194f23a3..e64c6877 100644 --- a/generator.go +++ b/generator.go @@ -22,7 +22,7 @@ type TokenIndex uint64 type DistributionFunc func() TokenIndex type Generator struct { - sources []*Source + partitions []*Partition inFlight inflight.InFlight size uint64 distributionSize uint64 @@ -45,16 +45,16 @@ type GeneratorConfig struct { func NewGenerator(table *Table, config *GeneratorConfig, logger *zap.Logger) *Generator { t := &tomb.Tomb{} - sources := make([]*Source, config.Size) + partitions := make([]*Partition, config.Size) for i := uint64(0); i < config.Size; i++ { - sources[i] = &Source{ + partitions[i] = &Partition{ values: make(chan ValueWithToken, config.PkUsedBufferSize), oldValues: make(chan ValueWithToken, config.PkUsedBufferSize), t: t, } } gs := &Generator{ - sources: sources, + partitions: partitions, inFlight: inflight.New(), size: config.Size, distributionSize: config.DistributionSize, @@ -75,9 +75,9 @@ func (g Generator) Get() (ValueWithToken, bool) { return emptyValueWithToken, false default: } - source := g.sources[uint64(g.idxFunc())%g.size] + partition := g.partitions[uint64(g.idxFunc())%g.size] for { - v := source.pick() + v := partition.pick() if g.inFlight.AddIfNotPresent(v.Token) { return v, true } @@ -92,7 +92,7 @@ func (g Generator) GetOld() (ValueWithToken, bool) { return emptyValueWithToken, false default: } - return g.sources[uint64(g.idxFunc())%g.size].getOld() + return g.partitions[uint64(g.idxFunc())%g.size].getOld() } type ValueWithToken struct { @@ -109,12 +109,12 @@ func (g *Generator) GiveOld(v ValueWithToken) { return default: } - source := g.sources[v.Token%g.size] + partition := g.partitions[v.Token%g.size] if len(v.Value) == 0 { g.inFlight.Delete(v.Token) return } - source.giveOld(v) + partition.giveOld(v) } func (g *Generator) Stop() { @@ -131,9 +131,9 @@ func (g *Generator) start() { values := g.createPartitionKeyValues(r) hash := hash(routingKeyCreator, g.table, values) idx := hash % g.size - source := g.sources[idx] + partition := g.partitions[idx] select { - case source.values <- ValueWithToken{Token: hash, Value: values}: + case partition.values <- ValueWithToken{Token: hash, Value: values}: case <-g.t.Dying(): g.logger.Info("stopping partition key generation loop") return nil diff --git a/generator_test.go b/generator_test.go index 6989e5a9..3bb69e48 100644 --- a/generator_test.go +++ b/generator_test.go @@ -23,8 +23,8 @@ func TestGenerator(t *testing.T) { Size: 10000, PkUsedBufferSize: 10000, DistributionSize: 1000, - DistributionFunc: func() uint64 { - return atomic.LoadUint64(¤t) + DistributionFunc: func() TokenIndex { + return TokenIndex(atomic.LoadUint64(¤t)) }, } logger, _ := zap.NewDevelopment() diff --git a/source.go b/partition.go similarity index 54% rename from source.go rename to partition.go index d1cf44c4..be713c5e 100644 --- a/source.go +++ b/partition.go @@ -2,23 +2,23 @@ package gemini import "gopkg.in/tomb.v2" -type Source struct { +type Partition struct { values chan ValueWithToken oldValues chan ValueWithToken t *tomb.Tomb } -//get returns a new value and ensures that it's corresponding token -//is not already in-flight. -func (s *Source) get() (ValueWithToken, bool) { +// get returns a new value and ensures that it's corresponding token +// is not already in-flight. +func (s *Partition) get() (ValueWithToken, bool) { return s.pick(), true } var emptyValueWithToken = ValueWithToken{} -//getOld returns a previously used value and token or a new if -//the old queue is empty. -func (s *Source) getOld() (ValueWithToken, bool) { +// getOld returns a previously used value and token or a new if +// the old queue is empty. +func (s *Partition) getOld() (ValueWithToken, bool) { select { case <-s.t.Dying(): return emptyValueWithToken, false @@ -30,14 +30,14 @@ func (s *Source) getOld() (ValueWithToken, bool) { // giveOld returns the supplied value for later reuse unless the value // is empty in which case it removes the corresponding token from the // in-flight tracking. -func (s *Source) giveOld(v ValueWithToken) { +func (s *Partition) giveOld(v ValueWithToken) { select { case s.oldValues <- v: default: - // Old source is full, just drop the value + // Old partition buffer is full, just drop the value } } -func (s *Source) pick() ValueWithToken { +func (s *Partition) pick() ValueWithToken { return <-s.values } diff --git a/schema.go b/schema.go index 51237d14..12b569a2 100644 --- a/schema.go +++ b/schema.go @@ -536,7 +536,7 @@ func (s *Schema) GetCreateSchema() []string { return stmts } -func (s *Schema) GenInsertStmt(t *Table, source *Generator, r *rand.Rand, p PartitionRangeConfig) (*Stmt, error) { +func (s *Schema) GenInsertStmt(t *Table, g *Generator, r *rand.Rand, p PartitionRangeConfig) (*Stmt, error) { t.mu.RLock() defer t.mu.RUnlock() @@ -549,7 +549,7 @@ func (s *Schema) GenInsertStmt(t *Table, source *Generator, r *rand.Rand, p Part typs = append(typs, pk.Type) } - valuesWithToken, ok := source.Get() + valuesWithToken, ok := g.Get() if !ok { return nil, nil } @@ -578,11 +578,11 @@ func (s *Schema) GenInsertStmt(t *Table, source *Generator, r *rand.Rand, p Part }, nil } -func (s *Schema) GenInsertJsonStmt(t *Table, source *Generator, r *rand.Rand, p PartitionRangeConfig) (*Stmt, error) { +func (s *Schema) GenInsertJsonStmt(t *Table, g *Generator, r *rand.Rand, p PartitionRangeConfig) (*Stmt, error) { t.mu.RLock() defer t.mu.RUnlock() - vals, ok := source.Get() + vals, ok := g.Get() if !ok { return nil, nil } @@ -636,7 +636,7 @@ func (s *Schema) GenInsertJsonStmt(t *Table, source *Generator, r *rand.Rand, p }, nil } -func (s *Schema) GenDeleteRows(t *Table, source *Generator, r *rand.Rand, p PartitionRangeConfig) (*Stmt, error) { +func (s *Schema) GenDeleteRows(t *Table, g *Generator, r *rand.Rand, p PartitionRangeConfig) (*Stmt, error) { t.mu.RLock() defer t.mu.RUnlock() @@ -649,7 +649,7 @@ func (s *Schema) GenDeleteRows(t *Table, source *Generator, r *rand.Rand, p Part typs = append(typs, pk.Type) } - vs, ok := source.Get() + vs, ok := g.Get() if !ok { return nil, nil } @@ -681,30 +681,30 @@ func (s *Schema) GenDDLStmt(t *Table, r *rand.Rand, p PartitionRangeConfig, sc * } } -func (s *Schema) GenMutateStmt(t *Table, source *Generator, r *rand.Rand, p PartitionRangeConfig, deletes bool) (*Stmt, error) { +func (s *Schema) GenMutateStmt(t *Table, g *Generator, r *rand.Rand, p PartitionRangeConfig, deletes bool) (*Stmt, error) { t.mu.RLock() defer t.mu.RUnlock() if !deletes { - return s.GenInsertStmt(t, source, r, p) + return s.GenInsertStmt(t, g, r, p) } switch n := rand.Intn(1000); n { case 10, 100: - return s.GenDeleteRows(t, source, r, p) + return s.GenDeleteRows(t, g, r, p) default: switch n := rand.Intn(2); n { case 0: if t.KnownIssues[KnownIssuesJsonWithTuples] { - return s.GenInsertStmt(t, source, r, p) + return s.GenInsertStmt(t, g, r, p) } - return s.GenInsertJsonStmt(t, source, r, p) + return s.GenInsertJsonStmt(t, g, r, p) default: - return s.GenInsertStmt(t, source, r, p) + return s.GenInsertStmt(t, g, r, p) } } } -func (s *Schema) GenCheckStmt(t *Table, source *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { +func (s *Schema) GenCheckStmt(t *Table, g *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { var n int if len(t.Indexes) > 0 { n = r.Intn(5) @@ -713,27 +713,27 @@ func (s *Schema) GenCheckStmt(t *Table, source *Generator, r *rand.Rand, p Parti } switch n { case 0: - return s.genSinglePartitionQuery(t, source, r, p) + return s.genSinglePartitionQuery(t, g, r, p) case 1: - return s.genMultiplePartitionQuery(t, source, r, p) + return s.genMultiplePartitionQuery(t, g, r, p) case 2: - return s.genClusteringRangeQuery(t, source, r, p) + return s.genClusteringRangeQuery(t, g, r, p) case 3: - return s.genMultiplePartitionClusteringRangeQuery(t, source, r, p) + return s.genMultiplePartitionClusteringRangeQuery(t, g, r, p) case 4: // Reducing the probability to hit these since they often take a long time to run n := r.Intn(5) switch n { case 0: - return s.genSingleIndexQuery(t, source, r, p) + return s.genSingleIndexQuery(t, g, r, p) default: - return s.genSinglePartitionQuery(t, source, r, p) + return s.genSinglePartitionQuery(t, g, r, p) } } return nil } -func (s *Schema) genSinglePartitionQuery(t *Table, source *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { +func (s *Schema) genSinglePartitionQuery(t *Table, g *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { t.mu.RLock() defer t.mu.RUnlock() @@ -752,7 +752,7 @@ func (s *Schema) genSinglePartitionQuery(t *Table, source *Generator, r *rand.Ra builder = builder.Where(qb.Eq(pk.Name)) typs = append(typs, pk.Type) } - values, ok := source.GetOld() + values, ok := g.GetOld() if !ok { return nil } @@ -765,7 +765,7 @@ func (s *Schema) genSinglePartitionQuery(t *Table, source *Generator, r *rand.Ra } } -func (s *Schema) genMultiplePartitionQuery(t *Table, source *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { +func (s *Schema) genMultiplePartitionQuery(t *Table, g *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { t.mu.RLock() defer t.mu.RUnlock() @@ -791,7 +791,7 @@ func (s *Schema) genMultiplePartitionQuery(t *Table, source *Generator, r *rand. for i, pk := range partitionKeys { builder = builder.Where(qb.InTuple(pk.Name, pkNum)) for j := 0; j < pkNum; j++ { - vs, ok := source.GetOld() + vs, ok := g.GetOld() if !ok { return nil } @@ -808,7 +808,7 @@ func (s *Schema) genMultiplePartitionQuery(t *Table, source *Generator, r *rand. } } -func (s *Schema) genClusteringRangeQuery(t *Table, source *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { +func (s *Schema) genClusteringRangeQuery(t *Table, g *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { t.mu.RLock() defer t.mu.RUnlock() @@ -827,7 +827,7 @@ func (s *Schema) genClusteringRangeQuery(t *Table, source *Generator, r *rand.Ra clusteringKeys = t.MaterializedViews[view].ClusteringKeys }*/ builder := qb.Select(s.Keyspace.Name + "." + tableName) - vs, ok := source.GetOld() + vs, ok := g.GetOld() if !ok { // Done or no values available... return nil @@ -858,7 +858,7 @@ func (s *Schema) genClusteringRangeQuery(t *Table, source *Generator, r *rand.Ra } } -func (s *Schema) genMultiplePartitionClusteringRangeQuery(t *Table, source *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { +func (s *Schema) genMultiplePartitionClusteringRangeQuery(t *Table, g *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { t.mu.RLock() defer t.mu.RUnlock() @@ -885,7 +885,7 @@ func (s *Schema) genMultiplePartitionClusteringRangeQuery(t *Table, source *Gene for i, pk := range partitionKeys { builder = builder.Where(qb.InTuple(pk.Name, pkNum)) for j := 0; j < pkNum; j++ { - vs, ok := source.GetOld() + vs, ok := g.GetOld() if !ok { return nil } @@ -914,7 +914,7 @@ func (s *Schema) genMultiplePartitionClusteringRangeQuery(t *Table, source *Gene } } -func (s *Schema) genSingleIndexQuery(t *Table, source *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { +func (s *Schema) genSingleIndexQuery(t *Table, g *Generator, r *rand.Rand, p PartitionRangeConfig) *Stmt { t.mu.RLock() defer t.mu.RUnlock()