Skip to content

Commit

Permalink
gemini: tomb package used to coordinate shutdown
Browse files Browse the repository at this point in the history
Hand wiring goroutine termination was messy. Using the package
gopkg.in/tomb.v2 makes it much simpler, safer and nicer.

This commit fixes a deadlock that happened sometimes diring heavy
operations and external shutdown.
  • Loading branch information
Henrik Johansson committed Sep 4, 2019
1 parent 84dbd28 commit f27170f
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 52 deletions.
43 changes: 21 additions & 22 deletions cmd/gemini/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,25 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/scylladb/gemini"
"github.com/scylladb/gemini/store"
"go.uber.org/zap"
"golang.org/x/exp/rand"
"gopkg.in/tomb.v2"
)

// MutationJob continuously applies mutations against the database
// for as long as the pump is active.
func MutationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) {
defer wg.Done()
func MutationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) {
schemaConfig := &schemaCfg
logger = logger.Named("mutation_job")
testStatus := Status{}
defer func() {
// Send any remaining updates back
c <- testStatus
}()
var i int
for hb := range pump {
hb.await()
Expand All @@ -35,20 +38,22 @@ func MutationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup,
}
if failFast && (testStatus.ReadErrors > 0 || testStatus.WriteErrors > 0) {
c <- testStatus
break
return
}
i++
}
}

// ValidationJob continuously applies validations against the database
// for as long as the pump is active.
func ValidationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) {
defer wg.Done()
func ValidationJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) {
schemaConfig := &schemaCfg
logger = logger.Named("validation_job")

testStatus := Status{}
defer func() {
c <- testStatus
}()
var i int
for hb := range pump {
hb.await()
Expand All @@ -58,17 +63,15 @@ func ValidationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGrou
testStatus = Status{}
}
if failFast && (testStatus.ReadErrors > 0 || testStatus.WriteErrors > 0) {
c <- testStatus
break
return
}
i++
}
}

// WarmupJob continuously applies mutations against the database
// for as long as the pump is active or the supplied duration expires.
func WarmupJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) {
defer wg.Done()
func WarmupJob(ctx context.Context, pump <-chan heartBeat, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Generator, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) {
schemaConfig := &schemaCfg
testStatus := Status{}
var i int
Expand Down Expand Up @@ -96,32 +99,28 @@ func WarmupJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, s
}
}

func job(done *sync.WaitGroup, f testJob, actors uint64, schema *gemini.Schema, schemaConfig gemini.SchemaConfig, s store.Store, pump *Pump, generators []*gemini.Generator, result chan Status, logger *zap.Logger) {
defer done.Done()
var finished sync.WaitGroup
finished.Add(1)

// Wait group for the worker goroutines.
var workers sync.WaitGroup
func job(f testJob, actors uint64, schema *gemini.Schema, schemaConfig gemini.SchemaConfig, s store.Store, pump *Pump, generators []*gemini.Generator, result chan Status, logger *zap.Logger) {
workerCtx, _ := context.WithCancel(context.Background())
workers.Add(len(schema.Tables) * int(actors))

partitionRangeConfig := gemini.PartitionRangeConfig{
MaxBlobLength: schemaConfig.MaxBlobLength,
MinBlobLength: schemaConfig.MinBlobLength,
MaxStringLength: schemaConfig.MaxStringLength,
MinStringLength: schemaConfig.MinStringLength,
}

var t tomb.Tomb
for j, table := range schema.Tables {
g := generators[j]
for i := 0; i < int(actors); i++ {
r := rand.New(rand.NewSource(seed))
go f(workerCtx, pump.ch, &workers, schema, schemaConfig, table, s, r, partitionRangeConfig, g, result, mode, warmup, logger)
tf := func() error {
f(workerCtx, pump.ch, schema, schemaConfig, table, s, r, partitionRangeConfig, g, result, mode, warmup, logger)
return nil
}
t.Go(tf)
}
}

workers.Wait()
_ = t.Wait()
}

func ddl(ctx context.Context, schema *gemini.Schema, sc *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, testStatus *Status, logger *zap.Logger) {
Expand Down
42 changes: 22 additions & 20 deletions cmd/gemini/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
_ "net/http/pprof"
"os"
"strings"
"sync"
"text/tabwriter"
"time"

Expand All @@ -28,6 +27,7 @@ import (
"golang.org/x/exp/rand"
"golang.org/x/net/context"
"gonum.org/v1/gonum/stat/distuv"
"gopkg.in/tomb.v2"
)

var (
Expand Down Expand Up @@ -88,7 +88,7 @@ func interactive() bool {
return !nonInteractive
}

type testJob func(context.Context, <-chan heartBeat, *sync.WaitGroup, *gemini.Schema, gemini.SchemaConfig, *gemini.Table, store.Store, *rand.Rand, gemini.PartitionRangeConfig, *gemini.Generator, chan Status, string, time.Duration, *zap.Logger)
type testJob func(context.Context, <-chan heartBeat, *gemini.Schema, gemini.SchemaConfig, *gemini.Table, store.Store, *rand.Rand, gemini.PartitionRangeConfig, *gemini.Generator, chan Status, string, time.Duration, *zap.Logger)

func readSchema(confFile string) (*gemini.Schema, error) {
byteValue, err := ioutil.ReadFile(confFile)
Expand Down Expand Up @@ -205,30 +205,29 @@ func run(cmd *cobra.Command, args []string) error {
}
}

done := &sync.WaitGroup{}
done.Add(1)
var t tomb.Tomb
result := make(chan Status, 10000)
endResult := make(chan Status, 1)
pump := createPump(10000, logger)
generators := createGenerators(schema, schemaConfig, distFunc, concurrency, distributionSize, logger)
go func() {
t.Go(func() error {
defer func() {
for _, g := range generators {
g.Stop()
}
}()
defer done.Done()
var sp *spinner.Spinner = nil
if interactive() {
sp = createSpinner()
}
pump.Start(duration+warmup, createPumpCallback(result, sp))
endResult <- sampleStatus(pump, result, sp, logger)
}()
return nil
})

launch(schema, schemaConfig, store, pump, generators, result, logger)
close(result)
done.Wait()
_ = t.Wait()
res := <-endResult
res.PrintResult(outFile, schema)
if res.HasErrors() {
Expand Down Expand Up @@ -281,28 +280,31 @@ func createDistributionFunc(distribution string, size, seed uint64, mu, sigma fl

func launch(schema *gemini.Schema, schemaConfig gemini.SchemaConfig, store store.Store, pump *Pump, generators []*gemini.Generator, result chan Status, logger *zap.Logger) {
if warmup > 0 {
done := &sync.WaitGroup{}
done.Add(1)
job(done, WarmupJob, concurrency, schema, schemaConfig, store, pump, generators, result, logger)
done.Wait()
job(WarmupJob, concurrency, schema, schemaConfig, store, pump, generators, result, logger)
logger.Info("Warmup done")
}
done := &sync.WaitGroup{}
done.Add(1)

var t tomb.Tomb
switch mode {
case writeMode:
go job(done, MutationJob, concurrency, schema, schemaConfig, store, pump, generators, result, logger)
t.Go(wrapJobInTombFunc(MutationJob, concurrency, schema, schemaConfig, store, pump, generators, result, logger))
case readMode:
go job(done, ValidationJob, concurrency, schema, schemaConfig, store, pump, generators, result, logger)
t.Go(wrapJobInTombFunc(ValidationJob, concurrency, schema, schemaConfig, store, pump, generators, result, logger))
default:
done.Add(1)
go job(done, MutationJob, concurrency, schema, schemaConfig, store, pump, generators, result, logger)
go job(done, ValidationJob, concurrency, schema, schemaConfig, store, pump, generators, result, logger)
t.Go(wrapJobInTombFunc(MutationJob, concurrency, schema, schemaConfig, store, pump, generators, result, logger))
t.Go(wrapJobInTombFunc(ValidationJob, concurrency, schema, schemaConfig, store, pump, generators, result, logger))
}
done.Wait()
_ = t.Wait()
logger.Info("All jobs complete")
}

func wrapJobInTombFunc(f testJob, actors uint64, schema *gemini.Schema, schemaConfig gemini.SchemaConfig, s store.Store, pump *Pump, generators []*gemini.Generator, result chan Status, logger *zap.Logger) func() error {
return func() error {
job(f, concurrency, schema, schemaConfig, s, pump, generators, result, logger)
return nil
}
}

func createLogger(level string) *zap.Logger {
lvl := zap.NewAtomicLevel()
if err := lvl.UnmarshalText([]byte(level)); err != nil {
Expand Down
8 changes: 2 additions & 6 deletions generator.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package gemini

import (
"sync"

"github.com/scylladb/gemini/inflight"

"github.com/scylladb/gemini/murmur"
"go.uber.org/zap"
"golang.org/x/exp/rand"
Expand Down Expand Up @@ -48,11 +45,10 @@ type GeneratorConfig struct {
func NewGenerator(table *Table, config *GeneratorConfig, logger *zap.Logger) *Generator {
sources := make([]*Source, config.Size)
for i := uint64(0); i < config.Size; i++ {
done := &sync.WaitGroup{}
done.Add(1)
sources[i] = &Source{
values: make(chan ValueWithToken, config.PkUsedBufferSize),
oldValues: make(chan ValueWithToken, config.PkUsedBufferSize),
doneCh: make(chan struct{}, 1),
}
}
gs := &Generator{
Expand Down Expand Up @@ -107,7 +103,7 @@ func (g *Generator) GiveOld(v ValueWithToken) {
func (g *Generator) Stop() {
g.doneCh <- struct{}{}
for _, s := range g.sources {
close(s.oldValues)
close(s.doneCh)
}
}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
golang.org/x/sys v0.0.0-20190412213103-97732733099d // indirect
gonum.org/v1/gonum v0.0.0-20190724133715-a8659125a966
gopkg.in/inf.v0 v0.9.1
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637
)

replace github.com/gocql/gocql => github.com/scylladb/gocql v1.2.0
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -144,5 +144,7 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 h1:yiW+nvdHb9LVqSHQBXfZCieqV4fzYhNBql77zY0ykqs=
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637/go.mod h1:BHsqpu/nsuzkT5BpiH1EMZPLyqSMM8JbIavyFACoFNk=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=
10 changes: 6 additions & 4 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gemini
type Source struct {
values chan ValueWithToken
oldValues chan ValueWithToken
doneCh chan struct{}
}

//get returns a new value and ensures that it's corresponding token
Expand All @@ -11,16 +12,16 @@ func (s *Source) get() (ValueWithToken, bool) {
return s.pick(), true
}

var emptyValueWithToken = ValueWithToken{}

//getOld returns a previously used value and token or a new if
//the old queue is empty.
func (s *Source) getOld() (ValueWithToken, bool) {
select {
case v, ok := <-s.oldValues:
return v, ok
/*default:
// There are no old values so we generate a new
return s.get()
*/
case <-s.doneCh:
return emptyValueWithToken, false
}
}

Expand All @@ -30,6 +31,7 @@ func (s *Source) getOld() (ValueWithToken, bool) {
func (s *Source) giveOld(v ValueWithToken) {
select {
case s.oldValues <- v:
case <-s.doneCh:
default:
// Old source is full, just drop the value
}
Expand Down

0 comments on commit f27170f

Please sign in to comment.