From cd0366519f697da8419a2959633758c855b9e1be Mon Sep 17 00:00:00 2001 From: Henrik Johansson Date: Wed, 3 Jul 2019 15:16:17 +0200 Subject: [PATCH] gemini: shutdown not waiting for warmup --- CHANGELOG.md | 1 + cmd/gemini/pump.go | 73 ++++++++++++++++++++++------------------------ cmd/gemini/root.go | 15 ++++++---- 3 files changed, 46 insertions(+), 43 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 30302a51..b4729b5c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## Unreleased +- Shutdown is no longer waiting for the warmup phase to complete. - Partition keys can now be any supported type. - The size of the partition key buffers can be configured on the commandline through `partition-key-buffer-size` and `partition-key-buffer-reuse-size`. diff --git a/cmd/gemini/pump.go b/cmd/gemini/pump.go index eef5daa3..9dfa6536 100644 --- a/cmd/gemini/pump.go +++ b/cmd/gemini/pump.go @@ -1,6 +1,7 @@ package main import ( + "context" "os" "os/signal" "sync" @@ -9,24 +10,29 @@ import ( "github.com/briandowns/spinner" "go.uber.org/zap" - "golang.org/x/net/context" ) type Pump struct { - ch chan heartBeat - ctx context.Context - cancel context.CancelFunc + ch chan heartBeat + done chan struct{} + graceful chan os.Signal + logger *zap.Logger } -func (p *Pump) Start(postFunc func()) { +func (p *Pump) Start(d time.Duration, postFunc func()) { go func() { + defer p.cleanup(postFunc) + timer := time.NewTimer(d) for { select { - case <-p.ctx.Done(): - close(p.ch) - for range p.ch { - } - postFunc() + case <-p.done: + p.logger.Info("Test run stopped. Exiting.") + return + case <-p.graceful: + p.logger.Info("Test run aborted. Exiting.") + return + case <-timer.C: + p.logger.Info("Test run completed. Exiting.") return default: p.ch <- newHeartBeat() @@ -36,46 +42,37 @@ func (p *Pump) Start(postFunc func()) { } func (p *Pump) Stop() { - p.cancel() + p.logger.Debug("pump asked to stop") + p.done <- struct{}{} } -func createPump(sz int, d time.Duration, logger *zap.Logger) *Pump { - logger = logger.Named("pump") - // Gracefully terminate - var gracefulStop = make(chan os.Signal) - signal.Notify(gracefulStop, syscall.SIGTERM, syscall.SIGINT) +func (p *Pump) cleanup(postFunc func()) { + close(p.ch) + for range p.ch { + } + p.logger.Debug("pump channel drained") + postFunc() +} - // Create the actual pump - pumpCh := make(chan heartBeat, sz) - pumpCtx, pumpCancel := context.WithCancel(context.Background()) +func createPump(sz int, logger *zap.Logger) *Pump { + logger = logger.Named("pump") + var graceful = make(chan os.Signal, 1) + signal.Notify(graceful, syscall.SIGTERM, syscall.SIGINT) pump := &Pump{ - ch: pumpCh, - ctx: pumpCtx, - cancel: pumpCancel, + ch: make(chan heartBeat, sz), + done: make(chan struct{}, 1), + graceful: graceful, + logger: logger, } - go func(d time.Duration) { - timer := time.NewTimer(d) - for { - select { - case <-gracefulStop: - pump.Stop() - logger.Info("Test run aborted. Exiting.") - return - case <-timer.C: - pump.Stop() - logger.Info("Test run completed. Exiting.") - return - } - } - }(duration + warmup) return pump } -func createPumpCallback(c chan Status, wg *sync.WaitGroup, sp *spinner.Spinner) func() { +func createPumpCallback(cancel context.CancelFunc, c chan Status, wg *sync.WaitGroup, sp *spinner.Spinner) func() { return func() { if sp != nil { sp.Stop() } + cancel() wg.Wait() close(c) } diff --git a/cmd/gemini/root.go b/cmd/gemini/root.go index da90a0b9..3fed45fa 100644 --- a/cmd/gemini/root.go +++ b/cmd/gemini/root.go @@ -347,14 +347,14 @@ func runJob(f testJob, schema *gemini.Schema, schemaConfig gemini.SchemaConfig, // Wait group for the worker goroutines. var workers sync.WaitGroup - workerCtx, _ := context.WithCancel(context.Background()) + workerCtx, workerCancel := context.WithCancel(context.Background()) workers.Add(len(schema.Tables) * int(concurrency)) // Wait group for the finished goroutine. var finished sync.WaitGroup finished.Add(1) - pump := createPump(10000, duration+warmup, logger) + pump := createPump(10000, logger) partitionRangeConfig := gemini.PartitionRangeConfig{ MaxBlobLength: schemaConfig.MaxBlobLength, @@ -386,7 +386,7 @@ func runJob(f testJob, schema *gemini.Schema, schemaConfig gemini.SchemaConfig, if interactive() { sp = createSpinner() } - pump.Start(createPumpCallback(c, &workers, sp)) + pump.Start(duration+warmup, createPumpCallback(workerCancel, c, &workers, sp)) res := sampleResults(pump, c, sp, logger) res.PrintResult(out) for _, g := range gs { @@ -546,8 +546,13 @@ func Job(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, schema warmup: for { select { - case <-ctx.Done(): - return + case _, ok := <-pump: + if !ok { + logger.Debug("job terminated") + return + } + } + select { case <-warmupTimer.C: break warmup default: