Skip to content

Commit

Permalink
gemini: tomb package used to coordinate shutdown
Browse files Browse the repository at this point in the history
Hand wiring goroutine termination was messy. Using the package
gopkg.in/tomb.v2 makes it much simpler, safer and nicer.

This commit fixes a deadlock that happened sometimes diring heavy
operations and external shutdown.
  • Loading branch information
Henrik Johansson committed Sep 4, 2019
1 parent 84dbd28 commit 583d711
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 88 deletions.
40 changes: 18 additions & 22 deletions cmd/gemini/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,25 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/scylladb/gemini"
"github.com/scylladb/gemini/store"
"go.uber.org/zap"
"golang.org/x/exp/rand"
"gopkg.in/tomb.v2"
)

// MutationJob continuously applies mutations against the database
// for as long as the pump is active.
func MutationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) {
defer wg.Done()
func MutationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) {
schemaConfig := &schemaCfg
logger = logger.Named("mutation_job")
testStatus := Status{}
defer func() {
// Send any remaining updates back
c <- testStatus
}()
var i int
for hb := range pump {
hb.await()
Expand All @@ -35,20 +38,22 @@ func MutationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup,
}
if failFast && (testStatus.ReadErrors > 0 || testStatus.WriteErrors > 0) {
c <- testStatus
break
return
}
i++
}
}

// ValidationJob continuously applies validations against the database
// for as long as the pump is active.
func ValidationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) {
defer wg.Done()
func ValidationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) {
schemaConfig := &schemaCfg
logger = logger.Named("validation_job")

testStatus := Status{}
defer func() {
c <- testStatus
}()
var i int
for hb := range pump {
hb.await()
Expand All @@ -58,17 +63,15 @@ func ValidationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGrou
testStatus = Status{}
}
if failFast && (testStatus.ReadErrors > 0 || testStatus.WriteErrors > 0) {
c <- testStatus
break
return
}
i++
}
}

// WarmupJob continuously applies mutations against the database
// for as long as the pump is active or the supplied duration expires.
func WarmupJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) {
defer wg.Done()
func WarmupJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) {
schemaConfig := &schemaCfg
testStatus := Status{}
var i int
Expand Down Expand Up @@ -96,16 +99,8 @@ func WarmupJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, s
}
}

func job(done *sync.WaitGroup, f testJob, actors uint64, schema *gemini.Schema, schemaConfig gemini.SchemaConfig, s store.Store, pump *Pump, generators []*gemini.Generator, result chan Status, logger *zap.Logger) {
defer done.Done()
var finished sync.WaitGroup
finished.Add(1)

// Wait group for the worker goroutines.
var workers sync.WaitGroup
func job(t *tomb.Tomb, f testJob, actors uint64, schema *gemini.Schema, schemaConfig gemini.SchemaConfig, s store.Store, pump *Pump, generators []*gemini.Generator, result chan Status, logger *zap.Logger) {
workerCtx, _ := context.WithCancel(context.Background())
workers.Add(len(schema.Tables) * int(actors))

partitionRangeConfig := gemini.PartitionRangeConfig{
MaxBlobLength: schemaConfig.MaxBlobLength,
MinBlobLength: schemaConfig.MinBlobLength,
Expand All @@ -117,11 +112,12 @@ func job(done *sync.WaitGroup, f testJob, actors uint64, schema *gemini.Schema,
g := generators[j]
for i := 0; i < int(actors); i++ {
r := rand.New(rand.NewSource(seed))
go f(workerCtx, pump.ch, &workers, schema, schemaConfig, table, s, r, partitionRangeConfig, g, result, mode, warmup, logger)
t.Go(func() error {
f(workerCtx, pump.ch, schema, schemaConfig, table, s, r, partitionRangeConfig, g, result, mode, warmup, logger)
return nil
})
}
}

workers.Wait()
}

func ddl(ctx context.Context, schema *gemini.Schema, sc *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, testStatus *Status, logger *zap.Logger) {
Expand Down
32 changes: 17 additions & 15 deletions cmd/gemini/pump.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import (
"time"

"github.com/briandowns/spinner"
"github.com/scylladb/gemini"
"go.uber.org/zap"
"gopkg.in/tomb.v2"
)

type Pump struct {
ch chan heartBeat
done chan struct{}
t *tomb.Tomb
graceful chan os.Signal
logger *zap.Logger
}
Expand All @@ -28,29 +30,26 @@ func (hb heartBeat) await() {
}

func (p *Pump) Start(d time.Duration, postFunc func()) {
go func() {
p.t.Go(func() error {
defer p.cleanup(postFunc)
timer := time.NewTimer(d)
for {
select {
case <-p.done:
case <-p.t.Dying():
p.logger.Info("Test run stopped. Exiting.")
return
return nil
case <-p.graceful:
p.logger.Info("Test run aborted. Exiting.")
return
p.t.Kill(nil)
return nil
case <-timer.C:
p.logger.Info("Test run completed. Exiting.")
return
p.t.Kill(nil)
return nil
case p.ch <- newHeartBeat():
}
}
}()
}

func (p *Pump) Stop() {
p.logger.Debug("pump asked to stop")
p.done <- struct{}{}
})
}

func (p *Pump) cleanup(postFunc func()) {
Expand All @@ -61,23 +60,26 @@ func (p *Pump) cleanup(postFunc func()) {
postFunc()
}

func createPump(sz int, logger *zap.Logger) *Pump {
func createPump(t *tomb.Tomb, sz int, logger *zap.Logger) *Pump {
logger = logger.Named("pump")
var graceful = make(chan os.Signal, 1)
signal.Notify(graceful, syscall.SIGTERM, syscall.SIGINT)
pump := &Pump{
ch: make(chan heartBeat, sz),
done: make(chan struct{}, 1),
t: t,
graceful: graceful,
logger: logger,
}
return pump
}

func createPumpCallback(result chan Status, sp *spinner.Spinner) func() {
func createPumpCallback(generators []*gemini.Generator, result chan Status, sp *spinner.Spinner) func() {
return func() {
if sp != nil {
sp.Stop()
}
for _, g := range generators {
g.Stop()
}
}
}
82 changes: 52 additions & 30 deletions cmd/gemini/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
_ "net/http/pprof"
"os"
"strings"
"sync"
"text/tabwriter"
"time"

Expand All @@ -28,6 +27,7 @@ import (
"golang.org/x/exp/rand"
"golang.org/x/net/context"
"gonum.org/v1/gonum/stat/distuv"
"gopkg.in/tomb.v2"
)

var (
Expand Down Expand Up @@ -88,7 +88,7 @@ func interactive() bool {
return !nonInteractive
}

type testJob func(context.Context, <-chan heartBeat, *sync.WaitGroup, *gemini.Schema, gemini.SchemaConfig, *gemini.Table, store.Store, *rand.Rand, gemini.PartitionRangeConfig, *gemini.Generator, chan Status, string, time.Duration, *zap.Logger)
type testJob func(context.Context, <-chan heartBeat, *gemini.Schema, gemini.SchemaConfig, *gemini.Table, store.Store, *rand.Rand, gemini.PartitionRangeConfig, *gemini.Generator, chan Status, string, time.Duration, *zap.Logger)

func readSchema(confFile string) (*gemini.Schema, error) {
byteValue, err := ioutil.ReadFile(confFile)
Expand Down Expand Up @@ -205,30 +205,25 @@ func run(cmd *cobra.Command, args []string) error {
}
}

done := &sync.WaitGroup{}
done.Add(1)
t := &tomb.Tomb{}
result := make(chan Status, 10000)
endResult := make(chan Status, 1)
pump := createPump(10000, logger)
pump := createPump(t, 10000, logger)
generators := createGenerators(schema, schemaConfig, distFunc, concurrency, distributionSize, logger)
go func() {
defer func() {
for _, g := range generators {
g.Stop()
}
}()
defer done.Done()
t.Go(func() error {
var sp *spinner.Spinner = nil
if interactive() {
sp = createSpinner()
}
pump.Start(duration+warmup, createPumpCallback(result, sp))
endResult <- sampleStatus(pump, result, sp, logger)
}()
pump.Start(duration+warmup, createPumpCallback(generators, result, sp))
endResult <- sampleStatus(result, sp, logger)
return nil
})

launch(schema, schemaConfig, store, pump, generators, result, logger)
close(result)
done.Wait()
logger.Info("result channel closed")
_ = t.Wait()
res := <-endResult
res.PrintResult(outFile, schema)
if res.HasErrors() {
Expand Down Expand Up @@ -280,29 +275,56 @@ func createDistributionFunc(distribution string, size, seed uint64, mu, sigma fl
}

func launch(schema *gemini.Schema, schemaConfig gemini.SchemaConfig, store store.Store, pump *Pump, generators []*gemini.Generator, result chan Status, logger *zap.Logger) {
if warmup > 0 {
done := &sync.WaitGroup{}
done.Add(1)
job(done, WarmupJob, concurrency, schema, schemaConfig, store, pump, generators, result, logger)
done.Wait()
logger.Info("Warmup done")
if doWarmup(schema, schemaConfig, store, pump, generators, result, logger) {
logger.Info("doWarmup terminates launch")
return
}
done := &sync.WaitGroup{}
done.Add(1)
t := &tomb.Tomb{}
switch mode {
case writeMode:
go job(done, MutationJob, concurrency, schema, schemaConfig, store, pump, generators, result, logger)
entombJobs(t, concurrency, schema, schemaConfig, store, pump, generators, result, logger, MutationJob)
case readMode:
go job(done, ValidationJob, concurrency, schema, schemaConfig, store, pump, generators, result, logger)
entombJobs(t, concurrency, schema, schemaConfig, store, pump, generators, result, logger, ValidationJob)
default:
done.Add(1)
go job(done, MutationJob, concurrency, schema, schemaConfig, store, pump, generators, result, logger)
go job(done, ValidationJob, concurrency, schema, schemaConfig, store, pump, generators, result, logger)
entombJobs(t, concurrency, schema, schemaConfig, store, pump, generators, result, logger, MutationJob, ValidationJob)
}
done.Wait()
_ = t.Wait()
logger.Info("All jobs complete")
}

func doWarmup(schema *gemini.Schema, schemaConfig gemini.SchemaConfig, s store.Store, pump *Pump, generators []*gemini.Generator, result chan Status, logger *zap.Logger) bool {
if warmup > 0 {
t := &tomb.Tomb{}
entombJobs(t, concurrency, schema, schemaConfig, s, pump, generators, result, logger, WarmupJob)
_ = t.Wait()
logger.Info("Warmup done")
select {
case <-pump.t.Dying():
logger.Info("Warmup dying")
return true
default:
logger.Info("Warmup not dying")
}
}
return false
}

func wrapJobInTombFunc(t *tomb.Tomb, f testJob, actors uint64, schema *gemini.Schema, schemaConfig gemini.SchemaConfig, s store.Store, pump *Pump, generators []*gemini.Generator, result chan Status, logger *zap.Logger) func() error {
return func() error {
job(t, f, concurrency, schema, schemaConfig, s, pump, generators, result, logger)
return nil
}
}

func entombJobs(t *tomb.Tomb, actors uint64, schema *gemini.Schema, schemaConfig gemini.SchemaConfig, s store.Store, pump *Pump, generators []*gemini.Generator, result chan Status, logger *zap.Logger, fs ...testJob) {
t.Go(func() error {
for _, f := range fs {
t.Go(wrapJobInTombFunc(t, f, actors, schema, schemaConfig, s, pump, generators, result, logger))
}
return nil
})
}

func createLogger(level string) *zap.Logger {
lvl := zap.NewAtomicLevel()
if err := lvl.UnmarshalText([]byte(level)); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cmd/gemini/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (r Status) HasErrors() bool {
return r.WriteErrors > 0 || r.ReadErrors > 0
}

func sampleStatus(p *Pump, c chan Status, sp *spinner.Spinner, logger *zap.Logger) Status {
func sampleStatus(c chan Status, sp *spinner.Spinner, logger *zap.Logger) Status {
failfastDone := sync.Once{}
logger = logger.Named("sample_results")
var testRes Status
Expand All @@ -83,8 +83,8 @@ func sampleStatus(p *Pump, c chan Status, sp *spinner.Spinner, logger *zap.Logge
if failFast {
failfastDone.Do(func() {
logger.Warn("Errors detected. Exiting.")
p.Stop()
})
return testRes
}
}
}
Expand Down
Loading

0 comments on commit 583d711

Please sign in to comment.