Skip to content

Commit

Permalink
Merge pull request #158 from scylladb/correct_shutdown
Browse files Browse the repository at this point in the history
gemini: shutdown not waiting for warmup
  • Loading branch information
Henrik Johansson authored Jul 3, 2019
2 parents 8bf6e7e + cd03665 commit 75e3707
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 43 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Unreleased

- Shutdown is no longer waiting for the warmup phase to complete.
- Partition keys can now be any supported type.
- The size of the partition key buffers can be configured on the commandline through
`partition-key-buffer-size` and `partition-key-buffer-reuse-size`.
Expand Down
73 changes: 35 additions & 38 deletions cmd/gemini/pump.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"os"
"os/signal"
"sync"
Expand All @@ -9,24 +10,29 @@ import (

"github.com/briandowns/spinner"
"go.uber.org/zap"
"golang.org/x/net/context"
)

type Pump struct {
ch chan heartBeat
ctx context.Context
cancel context.CancelFunc
ch chan heartBeat
done chan struct{}
graceful chan os.Signal
logger *zap.Logger
}

func (p *Pump) Start(postFunc func()) {
func (p *Pump) Start(d time.Duration, postFunc func()) {
go func() {
defer p.cleanup(postFunc)
timer := time.NewTimer(d)
for {
select {
case <-p.ctx.Done():
close(p.ch)
for range p.ch {
}
postFunc()
case <-p.done:
p.logger.Info("Test run stopped. Exiting.")
return
case <-p.graceful:
p.logger.Info("Test run aborted. Exiting.")
return
case <-timer.C:
p.logger.Info("Test run completed. Exiting.")
return
default:
p.ch <- newHeartBeat()
Expand All @@ -36,46 +42,37 @@ func (p *Pump) Start(postFunc func()) {
}

func (p *Pump) Stop() {
p.cancel()
p.logger.Debug("pump asked to stop")
p.done <- struct{}{}
}

func createPump(sz int, d time.Duration, logger *zap.Logger) *Pump {
logger = logger.Named("pump")
// Gracefully terminate
var gracefulStop = make(chan os.Signal)
signal.Notify(gracefulStop, syscall.SIGTERM, syscall.SIGINT)
func (p *Pump) cleanup(postFunc func()) {
close(p.ch)
for range p.ch {
}
p.logger.Debug("pump channel drained")
postFunc()
}

// Create the actual pump
pumpCh := make(chan heartBeat, sz)
pumpCtx, pumpCancel := context.WithCancel(context.Background())
func createPump(sz int, logger *zap.Logger) *Pump {
logger = logger.Named("pump")
var graceful = make(chan os.Signal, 1)
signal.Notify(graceful, syscall.SIGTERM, syscall.SIGINT)
pump := &Pump{
ch: pumpCh,
ctx: pumpCtx,
cancel: pumpCancel,
ch: make(chan heartBeat, sz),
done: make(chan struct{}, 1),
graceful: graceful,
logger: logger,
}
go func(d time.Duration) {
timer := time.NewTimer(d)
for {
select {
case <-gracefulStop:
pump.Stop()
logger.Info("Test run aborted. Exiting.")
return
case <-timer.C:
pump.Stop()
logger.Info("Test run completed. Exiting.")
return
}
}
}(duration + warmup)
return pump
}

func createPumpCallback(c chan Status, wg *sync.WaitGroup, sp *spinner.Spinner) func() {
func createPumpCallback(cancel context.CancelFunc, c chan Status, wg *sync.WaitGroup, sp *spinner.Spinner) func() {
return func() {
if sp != nil {
sp.Stop()
}
cancel()
wg.Wait()
close(c)
}
Expand Down
15 changes: 10 additions & 5 deletions cmd/gemini/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,14 +347,14 @@ func runJob(f testJob, schema *gemini.Schema, schemaConfig gemini.SchemaConfig,

// Wait group for the worker goroutines.
var workers sync.WaitGroup
workerCtx, _ := context.WithCancel(context.Background())
workerCtx, workerCancel := context.WithCancel(context.Background())
workers.Add(len(schema.Tables) * int(concurrency))

// Wait group for the finished goroutine.
var finished sync.WaitGroup
finished.Add(1)

pump := createPump(10000, duration+warmup, logger)
pump := createPump(10000, logger)

partitionRangeConfig := gemini.PartitionRangeConfig{
MaxBlobLength: schemaConfig.MaxBlobLength,
Expand Down Expand Up @@ -386,7 +386,7 @@ func runJob(f testJob, schema *gemini.Schema, schemaConfig gemini.SchemaConfig,
if interactive() {
sp = createSpinner()
}
pump.Start(createPumpCallback(c, &workers, sp))
pump.Start(duration+warmup, createPumpCallback(workerCancel, c, &workers, sp))
res := sampleResults(pump, c, sp, logger)
res.PrintResult(out)
for _, g := range gs {
Expand Down Expand Up @@ -546,8 +546,13 @@ func Job(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, schema
warmup:
for {
select {
case <-ctx.Done():
return
case _, ok := <-pump:
if !ok {
logger.Debug("job terminated")
return
}
}
select {
case <-warmupTimer.C:
break warmup
default:
Expand Down

0 comments on commit 75e3707

Please sign in to comment.