From 555e1eb15cdd7815ea2892f42ad53f422581f5e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ivan=20Miri=C4=87?= Date: Tue, 23 Feb 2021 16:19:42 +0100 Subject: [PATCH] Add per scenario global VU iterations --- core/local/local_test.go | 91 +++++++++++++++++++++++++++ js/modules/k6/execution/execution.go | 1 + js/runner_test.go | 1 + lib/executor/base_executor.go | 13 ++++ lib/executor/constant_arrival_rate.go | 3 +- lib/executor/constant_vus.go | 3 +- lib/executor/externally_controlled.go | 3 +- lib/executor/helpers.go | 3 +- lib/executor/per_vu_iterations.go | 3 +- lib/executor/ramping_arrival_rate.go | 3 +- lib/executor/ramping_vus.go | 3 +- lib/executor/shared_iterations.go | 3 +- lib/executors.go | 1 + 13 files changed, 123 insertions(+), 8 deletions(-) diff --git a/core/local/local_test.go b/core/local/local_test.go index 97c9ba7dbf1..1c213300e32 100644 --- a/core/local/local_test.go +++ b/core/local/local_test.go @@ -1480,3 +1480,94 @@ func TestExecutionStatsVUSharing(t *testing.T) { t.Fatal("timed out") } } + +func TestExecutionStatsScenarioIter(t *testing.T) { + t.Parallel() + script := []byte(` + import exec from 'k6/execution'; + import { sleep } from 'k6'; + + // The carr scenario should reuse the two VUs created for the pvu scenario. + export let options = { + scenarios: { + pvu: { + executor: 'per-vu-iterations', + exec: 'pvu', + vus: 2, + iterations: 5, + gracefulStop: '0s', + }, + carr: { + executor: 'constant-arrival-rate', + exec: 'carr', + rate: 10, + timeUnit: '1s', + duration: '1s', + preAllocatedVUs: 2, + maxVUs: 10, + startTime: '2s', + gracefulStop: '0s', + }, + }, + }; + + export function pvu() { + sleep(Math.random() * 0.5); + console.log(JSON.stringify(exec.getScenarioStats())); + } + + export function carr() { + console.log(JSON.stringify(exec.getScenarioStats())); + }; +`) + + logger := logrus.New() + logger.SetOutput(ioutil.Discard) + logHook := testutils.SimpleLogrusHook{HookedLevels: []logrus.Level{logrus.InfoLevel}} + logger.AddHook(&logHook) + + runner, err := js.New( + logger, + &loader.SourceData{ + URL: &url.URL{Path: "/script.js"}, + Data: script, + }, + nil, + lib.RuntimeOptions{}, + ) + require.NoError(t, err) + + ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger, lib.Options{}) + defer cancel() + + scStats := map[string]uint64{} + + type logEntry struct { + Name string + Iteration uint64 + } + + errCh := make(chan error, 1) + go func() { errCh <- execScheduler.Run(ctx, ctx, samples) }() + + select { + case err := <-errCh: + require.NoError(t, err) + entries := logHook.Drain() + require.Len(t, entries, 20) + le := &logEntry{} + for _, entry := range entries { + err = json.Unmarshal([]byte(entry.Message), le) + require.NoError(t, err) + scStats[le.Name] = le.Iteration + } + require.Len(t, scStats, 2) + // The global per scenario iteration count should be 9 (iterations + // start at 0), despite VUs being shared or more than 1 being used. + for _, v := range scStats { + assert.Equal(t, uint64(9), v) + } + case <-time.After(10 * time.Second): + t.Fatal("timed out") + } +} diff --git a/js/modules/k6/execution/execution.go b/js/modules/k6/execution/execution.go index 3286acec22c..d8d50e31da5 100644 --- a/js/modules/k6/execution/execution.go +++ b/js/modules/k6/execution/execution.go @@ -71,6 +71,7 @@ func (e *Execution) GetScenarioStats(ctx context.Context) (map[string]interface{ "executor": ss.Executor, "startTime": ss.StartTime, "progress": progress, + "iteration": ss.GetIter(), } return out, nil diff --git a/js/runner_test.go b/js/runner_test.go index 98e220002f0..3e75178849e 100644 --- a/js/runner_test.go +++ b/js/runner_test.go @@ -2083,6 +2083,7 @@ func TestExecutionStats(t *testing.T) { ProgressFn: func() (float64, []string) { return 0.1, nil }, + GetIter: func() uint64 { return 1 }, }) vu := initVU.Activate(&lib.VUActivationParams{ RunContext: ctx, diff --git a/lib/executor/base_executor.go b/lib/executor/base_executor.go index 7d9b543303f..5e2901444f3 100644 --- a/lib/executor/base_executor.go +++ b/lib/executor/base_executor.go @@ -40,6 +40,7 @@ type BaseExecutor struct { config lib.ExecutorConfig executionState *lib.ExecutionState localVUID *uint64 // counter for assigning executor-specific VU IDs + localIters *uint64 // counter for keeping track of all VU iterations completed by this executor logger *logrus.Entry progress *pb.ProgressBar } @@ -50,6 +51,7 @@ func NewBaseExecutor(config lib.ExecutorConfig, es *lib.ExecutionState, logger * config: config, executionState: es, localVUID: new(uint64), + localIters: new(uint64), logger: logger, progress: pb.New( pb.WithLeft(config.GetName), @@ -97,3 +99,14 @@ func (bs BaseExecutor) getMetricTags(vuID *uint64) *stats.SampleTags { } return stats.IntoSampleTags(&tags) } + +// GetScenarioIter returns the completed iterations by all VUs for this executor. +func (bs *BaseExecutor) GetScenarioIter() uint64 { + return atomic.LoadUint64(bs.localIters) +} + +// IncrScenarioIter increments the counter of completed iterations by all VUs +// for this executor. +func (bs *BaseExecutor) IncrScenarioIter() { + atomic.AddUint64(bs.localIters, 1) +} diff --git a/lib/executor/constant_arrival_rate.go b/lib/executor/constant_arrival_rate.go index c2e7b2858eb..bc8957f41a8 100644 --- a/lib/executor/constant_arrival_rate.go +++ b/lib/executor/constant_arrival_rate.go @@ -270,6 +270,7 @@ func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- stats.S Executor: car.config.Type, StartTime: startTime, ProgressFn: progressFn, + GetIter: car.GetScenarioIter, }) activationParams := getVUActivationParams(maxDurationCtx, car.config.BaseConfig, @@ -319,7 +320,7 @@ func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- stats.S activeVUs <- activateVU(initVU) } - runIterationBasic := getIterationRunner(car.executionState, car.logger) + runIterationBasic := getIterationRunner(car.executionState, car.IncrScenarioIter, car.logger) runIteration := func(vu lib.ActiveVU) { runIterationBasic(maxDurationCtx, vu) activeVUs <- vu diff --git a/lib/executor/constant_vus.go b/lib/executor/constant_vus.go index ddf54e10cdf..f703d0ffe19 100644 --- a/lib/executor/constant_vus.go +++ b/lib/executor/constant_vus.go @@ -174,13 +174,14 @@ func (clv ConstantVUs) Run(parentCtx context.Context, out chan<- stats.SampleCon defer activeVUs.Wait() regDurationDone := regDurationCtx.Done() - runIteration := getIterationRunner(clv.executionState, clv.logger) + runIteration := getIterationRunner(clv.executionState, clv.IncrScenarioIter, clv.logger) maxDurationCtx = lib.WithScenarioState(maxDurationCtx, &lib.ScenarioState{ Name: clv.config.Name, Executor: clv.config.Type, StartTime: startTime, ProgressFn: progressFn, + GetIter: clv.GetScenarioIter, }) activationParams := getVUActivationParams(maxDurationCtx, clv.config.BaseConfig, diff --git a/lib/executor/externally_controlled.go b/lib/executor/externally_controlled.go index 7d1315e8bc4..34712581064 100644 --- a/lib/executor/externally_controlled.go +++ b/lib/executor/externally_controlled.go @@ -530,7 +530,7 @@ func (mex *ExternallyControlled) Run(parentCtx context.Context, out chan<- stats currentlyPaused: false, activeVUsCount: new(int64), maxVUs: new(int64), - runIteration: getIterationRunner(mex.executionState, mex.logger), + runIteration: getIterationRunner(mex.executionState, mex.IncrScenarioIter, mex.logger), } *runState.maxVUs = startMaxVUs if err = runState.retrieveStartMaxVUs(); err != nil { @@ -542,6 +542,7 @@ func (mex *ExternallyControlled) Run(parentCtx context.Context, out chan<- stats Executor: mex.config.Type, StartTime: time.Now(), ProgressFn: runState.progressFn, + GetIter: mex.GetScenarioIter, }) mex.progress.Modify(pb.WithProgress(runState.progressFn)) // Keep track of the progress diff --git a/lib/executor/helpers.go b/lib/executor/helpers.go index 3a145c7f9e4..2ccf8365c98 100644 --- a/lib/executor/helpers.go +++ b/lib/executor/helpers.go @@ -80,7 +80,7 @@ func validateStages(stages []Stage) []error { // // TODO: emit the end-of-test iteration metrics here (https://github.com/loadimpact/k6/issues/1250) func getIterationRunner( - executionState *lib.ExecutionState, logger *logrus.Entry, + executionState *lib.ExecutionState, incrScenarioIter func(), logger *logrus.Entry, ) func(context.Context, lib.ActiveVU) bool { return func(ctx context.Context, vu lib.ActiveVU) bool { err := vu.RunOnce() @@ -108,6 +108,7 @@ func getIterationRunner( // TODO: move emission of end-of-iteration metrics here? executionState.AddFullIterations(1) + incrScenarioIter() return true } } diff --git a/lib/executor/per_vu_iterations.go b/lib/executor/per_vu_iterations.go index 5bc06c3a810..e7d501dc745 100644 --- a/lib/executor/per_vu_iterations.go +++ b/lib/executor/per_vu_iterations.go @@ -198,13 +198,14 @@ func (pvi PerVUIterations) Run(parentCtx context.Context, out chan<- stats.Sampl defer activeVUs.Wait() regDurationDone := regDurationCtx.Done() - runIteration := getIterationRunner(pvi.executionState, pvi.logger) + runIteration := getIterationRunner(pvi.executionState, pvi.IncrScenarioIter, pvi.logger) maxDurationCtx = lib.WithScenarioState(maxDurationCtx, &lib.ScenarioState{ Name: pvi.config.Name, Executor: pvi.config.Type, StartTime: startTime, ProgressFn: progressFn, + GetIter: pvi.GetScenarioIter, }) activationParams := getVUActivationParams(maxDurationCtx, pvi.config.BaseConfig, diff --git a/lib/executor/ramping_arrival_rate.go b/lib/executor/ramping_arrival_rate.go index 3f2f68fc6aa..47a930ed685 100644 --- a/lib/executor/ramping_arrival_rate.go +++ b/lib/executor/ramping_arrival_rate.go @@ -365,6 +365,7 @@ func (varr RampingArrivalRate) Run(parentCtx context.Context, out chan<- stats.S Executor: varr.config.Type, StartTime: startTime, ProgressFn: progressFn, + GetIter: varr.GetScenarioIter, }) activationParams := getVUActivationParams(maxDurationCtx, varr.config.BaseConfig, @@ -415,7 +416,7 @@ func (varr RampingArrivalRate) Run(parentCtx context.Context, out chan<- stats.S } regDurationDone := regDurationCtx.Done() - runIterationBasic := getIterationRunner(varr.executionState, varr.logger) + runIterationBasic := getIterationRunner(varr.executionState, varr.IncrScenarioIter, varr.logger) runIteration := func(vu lib.ActiveVU) { runIterationBasic(maxDurationCtx, vu) activeVUs <- vu diff --git a/lib/executor/ramping_vus.go b/lib/executor/ramping_vus.go index 20c3ffae30e..9d63009d813 100644 --- a/lib/executor/ramping_vus.go +++ b/lib/executor/ramping_vus.go @@ -604,7 +604,7 @@ func (vlv RampingVUs) Run(parentCtx context.Context, out chan<- stats.SampleCont // Actually schedule the VUs and iterations, likely the most complicated // executor among all of them... - runIteration := getIterationRunner(vlv.executionState, vlv.logger) + runIteration := getIterationRunner(vlv.executionState, vlv.IncrScenarioIter, vlv.logger) getVU := func() (lib.InitializedVU, error) { initVU, err := vlv.executionState.GetPlannedVU(vlv.logger, false) if err != nil { @@ -629,6 +629,7 @@ func (vlv RampingVUs) Run(parentCtx context.Context, out chan<- stats.SampleCont Executor: vlv.config.Type, StartTime: startTime, ProgressFn: progressFn, + GetIter: vlv.GetScenarioIter, }) vuHandles := make([]*vuHandle, maxVUs) for i := uint64(0); i < maxVUs; i++ { diff --git a/lib/executor/shared_iterations.go b/lib/executor/shared_iterations.go index ec5c8d9c40e..8bc734f4351 100644 --- a/lib/executor/shared_iterations.go +++ b/lib/executor/shared_iterations.go @@ -230,13 +230,14 @@ func (si SharedIterations) Run(parentCtx context.Context, out chan<- stats.Sampl }() regDurationDone := regDurationCtx.Done() - runIteration := getIterationRunner(si.executionState, si.logger) + runIteration := getIterationRunner(si.executionState, si.IncrScenarioIter, si.logger) maxDurationCtx = lib.WithScenarioState(maxDurationCtx, &lib.ScenarioState{ Name: si.config.Name, Executor: si.config.Type, StartTime: startTime, ProgressFn: progressFn, + GetIter: si.GetScenarioIter, }) activationParams := getVUActivationParams(maxDurationCtx, si.config.BaseConfig, diff --git a/lib/executors.go b/lib/executors.go index 1ef59633751..6274a6cafd3 100644 --- a/lib/executors.go +++ b/lib/executors.go @@ -115,6 +115,7 @@ type ScenarioState struct { Name, Executor string StartTime time.Time ProgressFn func() (float64, []string) + GetIter func() uint64 } // InitVUFunc is just a shorthand so we don't have to type the function