Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gemini: lazy generation of partition keys #200

Merged
merged 17 commits into from
Sep 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

# Unreleased

- Lazy partition key generation reintroduced to avoid out of memory issues.
This brings in a new CLI arg `--token-range-slices` that defines how many slices the
partition keyspace should be divided into when applying the different distribution functions.
The default value of this is set to an ad-hoc value of 10000 which should supply ample possibilities
for varying selection of values according to the chosen probability distribution.
- Fix overlapping operations on the same partition key ([#198](https://github.com/scylladb/gemini/issues/198)).
- Partition keys can now be drawn from various distributions such as ___"zipf"___,
___"uniform"___ and ___"normal"___. The CLI argument `--partition-key-distribution` is used
Expand Down
17 changes: 8 additions & 9 deletions cmd/gemini/generators.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,22 @@ import (
"go.uber.org/zap"
)

func createGenerators(schema *gemini.Schema, schemaConfig gemini.SchemaConfig, distributionFunc gemini.DistributionFunc, actors, distributionSize uint64, logger *zap.Logger) []*gemini.Generators {
func createGenerators(schema *gemini.Schema, schemaConfig gemini.SchemaConfig, distributionFunc gemini.DistributionFunc, actors, distributionSize uint64, logger *zap.Logger) []*gemini.Generator {
partitionRangeConfig := gemini.PartitionRangeConfig{
MaxBlobLength: schemaConfig.MaxBlobLength,
MinBlobLength: schemaConfig.MinBlobLength,
MaxStringLength: schemaConfig.MaxStringLength,
MinStringLength: schemaConfig.MinStringLength,
}

var gs []*gemini.Generators
var gs []*gemini.Generator
for _, table := range schema.Tables {
gCfg := &gemini.GeneratorsConfig{
Partitions: partitionRangeConfig,
Size: actors,
DistributionSize: distributionSize,
DistributionFunc: distributionFunc,
Seed: seed,
PkUsedBufferSize: pkBufferReuseSize,
gCfg := &gemini.GeneratorConfig{
PartitionsRangeConfig: partitionRangeConfig,
PartitionsCount: distributionSize,
PartitionsDistributionFunc: distributionFunc,
Seed: seed,
PkUsedBufferSize: pkBufferReuseSize,
}
g := gemini.NewGenerator(table, gCfg, logger.Named("generator"))
gs = append(gs, g)
Expand Down
59 changes: 28 additions & 31 deletions cmd/gemini/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,71 +4,74 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/scylladb/gemini"
"github.com/scylladb/gemini/store"
"go.uber.org/zap"
"golang.org/x/exp/rand"
"gopkg.in/tomb.v2"
)

// MutationJob continuously applies mutations against the database
// for as long as the pump is active.
func MutationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Source, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) {
defer wg.Done()
func MutationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, g *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) {
schemaConfig := &schemaCfg
logger = logger.Named("mutation_job")
testStatus := Status{}
defer func() {
// Send any remaining updates back
c <- testStatus
}()
var i int
for hb := range pump {
hb.await()
ind := r.Intn(1000000)
if ind%100000 == 0 {
ddl(ctx, schema, schemaConfig, table, s, r, p, &testStatus, logger)
} else {
mutation(ctx, schema, schemaConfig, table, s, r, p, source, &testStatus, true, logger)
mutation(ctx, schema, schemaConfig, table, s, r, p, g, &testStatus, true, logger)
}
if i%1000 == 0 {
c <- testStatus
testStatus = Status{}
}
if failFast && (testStatus.ReadErrors > 0 || testStatus.WriteErrors > 0) {
c <- testStatus
break
return
}
i++
}
}

// ValidationJob continuously applies validations against the database
// for as long as the pump is active.
func ValidationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Source, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) {
defer wg.Done()
func ValidationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, g *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) {
schemaConfig := &schemaCfg
logger = logger.Named("validation_job")

testStatus := Status{}
defer func() {
c <- testStatus
}()
var i int
for hb := range pump {
hb.await()
validation(ctx, schema, schemaConfig, table, s, r, p, source, &testStatus, logger)
validation(ctx, schema, schemaConfig, table, s, r, p, g, &testStatus, logger)
if i%1000 == 0 {
c <- testStatus
testStatus = Status{}
}
if failFast && (testStatus.ReadErrors > 0 || testStatus.WriteErrors > 0) {
c <- testStatus
break
return
}
i++
}
}

// WarmupJob continuously applies mutations against the database
// for as long as the pump is active or the supplied duration expires.
func WarmupJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Source, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) {
defer wg.Done()
func WarmupJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, g *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) {
schemaConfig := &schemaCfg
testStatus := Status{}
var i int
Expand All @@ -87,7 +90,7 @@ func WarmupJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, s
c <- testStatus
return
default:
mutation(ctx, schema, schemaConfig, table, s, r, p, source, &testStatus, false, logger)
mutation(ctx, schema, schemaConfig, table, s, r, p, g, &testStatus, false, logger)
if i%1000 == 0 {
c <- testStatus
testStatus = Status{}
Expand All @@ -96,16 +99,8 @@ func WarmupJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, s
}
}

func job(done *sync.WaitGroup, f testJob, actors uint64, schema *gemini.Schema, schemaConfig gemini.SchemaConfig, s store.Store, pump *Pump, generators []*gemini.Generators, result chan Status, logger *zap.Logger) {
defer done.Done()
var finished sync.WaitGroup
finished.Add(1)

// Wait group for the worker goroutines.
var workers sync.WaitGroup
func job(t *tomb.Tomb, f testJob, actors uint64, schema *gemini.Schema, schemaConfig gemini.SchemaConfig, s store.Store, pump *Pump, generators []*gemini.Generator, result chan Status, logger *zap.Logger) {
workerCtx, _ := context.WithCancel(context.Background())
workers.Add(len(schema.Tables) * int(actors))

partitionRangeConfig := gemini.PartitionRangeConfig{
MaxBlobLength: schemaConfig.MaxBlobLength,
MinBlobLength: schemaConfig.MinBlobLength,
Expand All @@ -114,13 +109,15 @@ func job(done *sync.WaitGroup, f testJob, actors uint64, schema *gemini.Schema,
}

for j, table := range schema.Tables {
g := generators[j]
for i := 0; i < int(actors); i++ {
r := rand.New(rand.NewSource(seed))
go f(workerCtx, pump.ch, &workers, schema, schemaConfig, table, s, r, partitionRangeConfig, generators[j].Get(i), result, mode, warmup, logger)
t.Go(func() error {
f(workerCtx, pump.ch, schema, schemaConfig, table, s, r, partitionRangeConfig, g, result, mode, warmup, logger)
return nil
})
}
}

workers.Wait()
}

func ddl(ctx context.Context, schema *gemini.Schema, sc *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, testStatus *Status, logger *zap.Logger) {
Expand Down Expand Up @@ -168,8 +165,8 @@ func ddl(ctx context.Context, schema *gemini.Schema, sc *gemini.SchemaConfig, ta
}
}

func mutation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Source, testStatus *Status, deletes bool, logger *zap.Logger) {
mutateStmt, err := schema.GenMutateStmt(table, source, r, p, deletes)
func mutation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, g *gemini.Generator, testStatus *Status, deletes bool, logger *zap.Logger) {
mutateStmt, err := schema.GenMutateStmt(table, g, r, p, deletes)
if err != nil {
logger.Error("Failed! Mutation statement generation failed", zap.Error(err))
testStatus.WriteErrors++
Expand All @@ -186,7 +183,7 @@ func mutation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig
defer func() {
v := make(gemini.Value, len(table.PartitionKeys))
copy(v, mutateValues)
source.GiveOld(gemini.ValueWithToken{Token: token, Value: v})
g.GiveOld(gemini.ValueWithToken{Token: token, Value: v})
}()
if w := logger.Check(zap.DebugLevel, "validation statement"); w != nil {
w.Write(zap.String("pretty_cql", mutateStmt.PrettyCQL()))
Expand All @@ -204,8 +201,8 @@ func mutation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig
}
}

func validation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Source, testStatus *Status, logger *zap.Logger) {
checkStmt := schema.GenCheckStmt(table, source, r, p)
func validation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, g *gemini.Generator, testStatus *Status, logger *zap.Logger) {
checkStmt := schema.GenCheckStmt(table, g, r, p)
if checkStmt == nil {
if w := logger.Check(zap.DebugLevel, "no statement generated"); w != nil {
w.Write(zap.String("job", "validation"))
Expand All @@ -216,7 +213,7 @@ func validation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConf
token, checkValues := checkStmt.Values()
defer func() {
// Signal done with this pk...
source.GiveOld(gemini.ValueWithToken{Token: token})
g.GiveOld(gemini.ValueWithToken{Token: token})
}()
if w := logger.Check(zap.DebugLevel, "validation statement"); w != nil {
w.Write(zap.String("pretty_cql", checkStmt.PrettyCQL()))
Expand Down
32 changes: 17 additions & 15 deletions cmd/gemini/pump.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import (
"time"

"github.com/briandowns/spinner"
"github.com/scylladb/gemini"
"go.uber.org/zap"
"gopkg.in/tomb.v2"
)

type Pump struct {
ch chan heartBeat
done chan struct{}
t *tomb.Tomb
graceful chan os.Signal
logger *zap.Logger
}
Expand All @@ -28,29 +30,26 @@ func (hb heartBeat) await() {
}

func (p *Pump) Start(d time.Duration, postFunc func()) {
go func() {
p.t.Go(func() error {
defer p.cleanup(postFunc)
timer := time.NewTimer(d)
for {
select {
case <-p.done:
case <-p.t.Dying():
p.logger.Info("Test run stopped. Exiting.")
return
return nil
case <-p.graceful:
p.logger.Info("Test run aborted. Exiting.")
return
p.t.Kill(nil)
return nil
case <-timer.C:
p.logger.Info("Test run completed. Exiting.")
return
p.t.Kill(nil)
return nil
case p.ch <- newHeartBeat():
}
}
}()
}

func (p *Pump) Stop() {
p.logger.Debug("pump asked to stop")
p.done <- struct{}{}
})
}

func (p *Pump) cleanup(postFunc func()) {
Expand All @@ -61,23 +60,26 @@ func (p *Pump) cleanup(postFunc func()) {
postFunc()
}

func createPump(sz int, logger *zap.Logger) *Pump {
func createPump(t *tomb.Tomb, sz int, logger *zap.Logger) *Pump {
logger = logger.Named("pump")
var graceful = make(chan os.Signal, 1)
signal.Notify(graceful, syscall.SIGTERM, syscall.SIGINT)
pump := &Pump{
ch: make(chan heartBeat, sz),
done: make(chan struct{}, 1),
t: t,
graceful: graceful,
logger: logger,
}
return pump
}

func createPumpCallback(result chan Status, sp *spinner.Spinner) func() {
func createPumpCallback(generators []*gemini.Generator, result chan Status, sp *spinner.Spinner) func() {
return func() {
if sp != nil {
sp.Stop()
}
for _, g := range generators {
g.Stop()
}
}
}
Loading