Skip to content

Commit

Permalink
gemini: renamed source to partition
Browse files Browse the repository at this point in the history
  • Loading branch information
Henrik Johansson committed Sep 5, 2019
1 parent 0cc70d5 commit d541e7d
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 64 deletions.
24 changes: 12 additions & 12 deletions cmd/gemini/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

// MutationJob continuously applies mutations against the database
// for as long as the pump is active.
func MutationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) {
func MutationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, g *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) {
schemaConfig := &schemaCfg
logger = logger.Named("mutation_job")
testStatus := Status{}
Expand All @@ -30,7 +30,7 @@ func MutationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Sche
if ind%100000 == 0 {
ddl(ctx, schema, schemaConfig, table, s, r, p, &testStatus, logger)
} else {
mutation(ctx, schema, schemaConfig, table, s, r, p, source, &testStatus, true, logger)
mutation(ctx, schema, schemaConfig, table, s, r, p, g, &testStatus, true, logger)
}
if i%1000 == 0 {
c <- testStatus
Expand All @@ -46,7 +46,7 @@ func MutationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Sche

// ValidationJob continuously applies validations against the database
// for as long as the pump is active.
func ValidationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) {
func ValidationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, g *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) {
schemaConfig := &schemaCfg
logger = logger.Named("validation_job")

Expand All @@ -57,7 +57,7 @@ func ValidationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Sc
var i int
for hb := range pump {
hb.await()
validation(ctx, schema, schemaConfig, table, s, r, p, source, &testStatus, logger)
validation(ctx, schema, schemaConfig, table, s, r, p, g, &testStatus, logger)
if i%1000 == 0 {
c <- testStatus
testStatus = Status{}
Expand All @@ -71,7 +71,7 @@ func ValidationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Sc

// WarmupJob continuously applies mutations against the database
// for as long as the pump is active or the supplied duration expires.
func WarmupJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) {
func WarmupJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, g *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) {
schemaConfig := &schemaCfg
testStatus := Status{}
var i int
Expand All @@ -90,7 +90,7 @@ func WarmupJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema
c <- testStatus
return
default:
mutation(ctx, schema, schemaConfig, table, s, r, p, source, &testStatus, false, logger)
mutation(ctx, schema, schemaConfig, table, s, r, p, g, &testStatus, false, logger)
if i%1000 == 0 {
c <- testStatus
testStatus = Status{}
Expand Down Expand Up @@ -165,8 +165,8 @@ func ddl(ctx context.Context, schema *gemini.Schema, sc *gemini.SchemaConfig, ta
}
}

func mutation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, testStatus *Status, deletes bool, logger *zap.Logger) {
mutateStmt, err := schema.GenMutateStmt(table, source, r, p, deletes)
func mutation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, g *gemini.Generator, testStatus *Status, deletes bool, logger *zap.Logger) {
mutateStmt, err := schema.GenMutateStmt(table, g, r, p, deletes)
if err != nil {
logger.Error("Failed! Mutation statement generation failed", zap.Error(err))
testStatus.WriteErrors++
Expand All @@ -183,7 +183,7 @@ func mutation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig
defer func() {
v := make(gemini.Value, len(table.PartitionKeys))
copy(v, mutateValues)
source.GiveOld(gemini.ValueWithToken{Token: token, Value: v})
g.GiveOld(gemini.ValueWithToken{Token: token, Value: v})
}()
if w := logger.Check(zap.DebugLevel, "validation statement"); w != nil {
w.Write(zap.String("pretty_cql", mutateStmt.PrettyCQL()))
Expand All @@ -201,8 +201,8 @@ func mutation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig
}
}

func validation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, testStatus *Status, logger *zap.Logger) {
checkStmt := schema.GenCheckStmt(table, source, r, p)
func validation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, g *gemini.Generator, testStatus *Status, logger *zap.Logger) {
checkStmt := schema.GenCheckStmt(table, g, r, p)
if checkStmt == nil {
if w := logger.Check(zap.DebugLevel, "no statement generated"); w != nil {
w.Write(zap.String("job", "validation"))
Expand All @@ -213,7 +213,7 @@ func validation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConf
token, checkValues := checkStmt.Values()
defer func() {
// Signal done with this pk...
source.GiveOld(gemini.ValueWithToken{Token: token})
g.GiveOld(gemini.ValueWithToken{Token: token})
}()
if w := logger.Check(zap.DebugLevel, "validation statement"); w != nil {
w.Write(zap.String("pretty_cql", checkStmt.PrettyCQL()))
Expand Down
2 changes: 1 addition & 1 deletion docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ in use but the idea is to introduce some jitter into the execution flow.

The application generates partition ids through a `Generator` that creates a steady flow of partition
key components for the desired [concurrency](architecture.md#Concurrency).
Each goroutine is connected to a `source` that the generator controls. This source continuously emits
Each goroutine is connected to a `partition` that the generator controls. This partition continuously emits
new partition ids in the form of a `[]interface{}`. These keys are created in the same way as the the
driver does to ensure that each goroutine only processes partition keys from it's designated bucket.
These partition keys These values are copied into another list that keeps the old partition ids for
Expand Down
22 changes: 11 additions & 11 deletions generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type TokenIndex uint64
type DistributionFunc func() TokenIndex

type Generator struct {
sources []*Source
partitions []*Partition
inFlight inflight.InFlight
size uint64
distributionSize uint64
Expand All @@ -45,16 +45,16 @@ type GeneratorConfig struct {

func NewGenerator(table *Table, config *GeneratorConfig, logger *zap.Logger) *Generator {
t := &tomb.Tomb{}
sources := make([]*Source, config.Size)
partitions := make([]*Partition, config.Size)
for i := uint64(0); i < config.Size; i++ {
sources[i] = &Source{
partitions[i] = &Partition{
values: make(chan ValueWithToken, config.PkUsedBufferSize),
oldValues: make(chan ValueWithToken, config.PkUsedBufferSize),
t: t,
}
}
gs := &Generator{
sources: sources,
partitions: partitions,
inFlight: inflight.New(),
size: config.Size,
distributionSize: config.DistributionSize,
Expand All @@ -75,9 +75,9 @@ func (g Generator) Get() (ValueWithToken, bool) {
return emptyValueWithToken, false
default:
}
source := g.sources[uint64(g.idxFunc())%g.size]
partition := g.partitions[uint64(g.idxFunc())%g.size]
for {
v := source.pick()
v := partition.pick()
if g.inFlight.AddIfNotPresent(v.Token) {
return v, true
}
Expand All @@ -92,7 +92,7 @@ func (g Generator) GetOld() (ValueWithToken, bool) {
return emptyValueWithToken, false
default:
}
return g.sources[uint64(g.idxFunc())%g.size].getOld()
return g.partitions[uint64(g.idxFunc())%g.size].getOld()
}

type ValueWithToken struct {
Expand All @@ -109,12 +109,12 @@ func (g *Generator) GiveOld(v ValueWithToken) {
return
default:
}
source := g.sources[v.Token%g.size]
partition := g.partitions[v.Token%g.size]
if len(v.Value) == 0 {
g.inFlight.Delete(v.Token)
return
}
source.giveOld(v)
partition.giveOld(v)
}

func (g *Generator) Stop() {
Expand All @@ -131,9 +131,9 @@ func (g *Generator) start() {
values := g.createPartitionKeyValues(r)
hash := hash(routingKeyCreator, g.table, values)
idx := hash % g.size
source := g.sources[idx]
partition := g.partitions[idx]
select {
case source.values <- ValueWithToken{Token: hash, Value: values}:
case partition.values <- ValueWithToken{Token: hash, Value: values}:
case <-g.t.Dying():
g.logger.Info("stopping partition key generation loop")
return nil
Expand Down
4 changes: 2 additions & 2 deletions generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ func TestGenerator(t *testing.T) {
Size: 10000,
PkUsedBufferSize: 10000,
DistributionSize: 1000,
DistributionFunc: func() uint64 {
return atomic.LoadUint64(&current)
DistributionFunc: func() TokenIndex {
return TokenIndex(atomic.LoadUint64(&current))
},
}
logger, _ := zap.NewDevelopment()
Expand Down
20 changes: 10 additions & 10 deletions source.go → partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,23 @@ package gemini

import "gopkg.in/tomb.v2"

type Source struct {
type Partition struct {
values chan ValueWithToken
oldValues chan ValueWithToken
t *tomb.Tomb
}

//get returns a new value and ensures that it's corresponding token
//is not already in-flight.
func (s *Source) get() (ValueWithToken, bool) {
// get returns a new value and ensures that it's corresponding token
// is not already in-flight.
func (s *Partition) get() (ValueWithToken, bool) {
return s.pick(), true
}

var emptyValueWithToken = ValueWithToken{}

//getOld returns a previously used value and token or a new if
//the old queue is empty.
func (s *Source) getOld() (ValueWithToken, bool) {
// getOld returns a previously used value and token or a new if
// the old queue is empty.
func (s *Partition) getOld() (ValueWithToken, bool) {
select {
case <-s.t.Dying():
return emptyValueWithToken, false
Expand All @@ -30,14 +30,14 @@ func (s *Source) getOld() (ValueWithToken, bool) {
// giveOld returns the supplied value for later reuse unless the value
// is empty in which case it removes the corresponding token from the
// in-flight tracking.
func (s *Source) giveOld(v ValueWithToken) {
func (s *Partition) giveOld(v ValueWithToken) {
select {
case s.oldValues <- v:
default:
// Old source is full, just drop the value
// Old partition buffer is full, just drop the value
}
}

func (s *Source) pick() ValueWithToken {
func (s *Partition) pick() ValueWithToken {
return <-s.values
}
Loading

0 comments on commit d541e7d

Please sign in to comment.