Skip to content

Commit

Permalink
Add per scenario global VU iterations
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivan Mirić committed Feb 24, 2021
1 parent baa0e94 commit 555e1eb
Show file tree
Hide file tree
Showing 13 changed files with 123 additions and 8 deletions.
91 changes: 91 additions & 0 deletions core/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1480,3 +1480,94 @@ func TestExecutionStatsVUSharing(t *testing.T) {
t.Fatal("timed out")
}
}

func TestExecutionStatsScenarioIter(t *testing.T) {
t.Parallel()
script := []byte(`
import exec from 'k6/execution';
import { sleep } from 'k6';
// The carr scenario should reuse the two VUs created for the pvu scenario.
export let options = {
scenarios: {
pvu: {
executor: 'per-vu-iterations',
exec: 'pvu',
vus: 2,
iterations: 5,
gracefulStop: '0s',
},
carr: {
executor: 'constant-arrival-rate',
exec: 'carr',
rate: 10,
timeUnit: '1s',
duration: '1s',
preAllocatedVUs: 2,
maxVUs: 10,
startTime: '2s',
gracefulStop: '0s',
},
},
};
export function pvu() {
sleep(Math.random() * 0.5);
console.log(JSON.stringify(exec.getScenarioStats()));
}
export function carr() {
console.log(JSON.stringify(exec.getScenarioStats()));
};
`)

logger := logrus.New()
logger.SetOutput(ioutil.Discard)
logHook := testutils.SimpleLogrusHook{HookedLevels: []logrus.Level{logrus.InfoLevel}}
logger.AddHook(&logHook)

runner, err := js.New(
logger,
&loader.SourceData{
URL: &url.URL{Path: "/script.js"},
Data: script,
},
nil,
lib.RuntimeOptions{},
)
require.NoError(t, err)

ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger, lib.Options{})
defer cancel()

scStats := map[string]uint64{}

type logEntry struct {
Name string
Iteration uint64
}

errCh := make(chan error, 1)
go func() { errCh <- execScheduler.Run(ctx, ctx, samples) }()

select {
case err := <-errCh:
require.NoError(t, err)
entries := logHook.Drain()
require.Len(t, entries, 20)
le := &logEntry{}
for _, entry := range entries {
err = json.Unmarshal([]byte(entry.Message), le)
require.NoError(t, err)
scStats[le.Name] = le.Iteration
}
require.Len(t, scStats, 2)
// The global per scenario iteration count should be 9 (iterations
// start at 0), despite VUs being shared or more than 1 being used.
for _, v := range scStats {
assert.Equal(t, uint64(9), v)
}
case <-time.After(10 * time.Second):
t.Fatal("timed out")
}
}
1 change: 1 addition & 0 deletions js/modules/k6/execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (e *Execution) GetScenarioStats(ctx context.Context) (map[string]interface{
"executor": ss.Executor,
"startTime": ss.StartTime,
"progress": progress,
"iteration": ss.GetIter(),
}

return out, nil
Expand Down
1 change: 1 addition & 0 deletions js/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2083,6 +2083,7 @@ func TestExecutionStats(t *testing.T) {
ProgressFn: func() (float64, []string) {
return 0.1, nil
},
GetIter: func() uint64 { return 1 },
})
vu := initVU.Activate(&lib.VUActivationParams{
RunContext: ctx,
Expand Down
13 changes: 13 additions & 0 deletions lib/executor/base_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type BaseExecutor struct {
config lib.ExecutorConfig
executionState *lib.ExecutionState
localVUID *uint64 // counter for assigning executor-specific VU IDs
localIters *uint64 // counter for keeping track of all VU iterations completed by this executor
logger *logrus.Entry
progress *pb.ProgressBar
}
Expand All @@ -50,6 +51,7 @@ func NewBaseExecutor(config lib.ExecutorConfig, es *lib.ExecutionState, logger *
config: config,
executionState: es,
localVUID: new(uint64),
localIters: new(uint64),
logger: logger,
progress: pb.New(
pb.WithLeft(config.GetName),
Expand Down Expand Up @@ -97,3 +99,14 @@ func (bs BaseExecutor) getMetricTags(vuID *uint64) *stats.SampleTags {
}
return stats.IntoSampleTags(&tags)
}

// GetScenarioIter returns the completed iterations by all VUs for this executor.
func (bs *BaseExecutor) GetScenarioIter() uint64 {
return atomic.LoadUint64(bs.localIters)
}

// IncrScenarioIter increments the counter of completed iterations by all VUs
// for this executor.
func (bs *BaseExecutor) IncrScenarioIter() {
atomic.AddUint64(bs.localIters, 1)
}
3 changes: 2 additions & 1 deletion lib/executor/constant_arrival_rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- stats.S
Executor: car.config.Type,
StartTime: startTime,
ProgressFn: progressFn,
GetIter: car.GetScenarioIter,
})

activationParams := getVUActivationParams(maxDurationCtx, car.config.BaseConfig,
Expand Down Expand Up @@ -319,7 +320,7 @@ func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- stats.S
activeVUs <- activateVU(initVU)
}

runIterationBasic := getIterationRunner(car.executionState, car.logger)
runIterationBasic := getIterationRunner(car.executionState, car.IncrScenarioIter, car.logger)
runIteration := func(vu lib.ActiveVU) {
runIterationBasic(maxDurationCtx, vu)
activeVUs <- vu
Expand Down
3 changes: 2 additions & 1 deletion lib/executor/constant_vus.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,14 @@ func (clv ConstantVUs) Run(parentCtx context.Context, out chan<- stats.SampleCon
defer activeVUs.Wait()

regDurationDone := regDurationCtx.Done()
runIteration := getIterationRunner(clv.executionState, clv.logger)
runIteration := getIterationRunner(clv.executionState, clv.IncrScenarioIter, clv.logger)

maxDurationCtx = lib.WithScenarioState(maxDurationCtx, &lib.ScenarioState{
Name: clv.config.Name,
Executor: clv.config.Type,
StartTime: startTime,
ProgressFn: progressFn,
GetIter: clv.GetScenarioIter,
})

activationParams := getVUActivationParams(maxDurationCtx, clv.config.BaseConfig,
Expand Down
3 changes: 2 additions & 1 deletion lib/executor/externally_controlled.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ func (mex *ExternallyControlled) Run(parentCtx context.Context, out chan<- stats
currentlyPaused: false,
activeVUsCount: new(int64),
maxVUs: new(int64),
runIteration: getIterationRunner(mex.executionState, mex.logger),
runIteration: getIterationRunner(mex.executionState, mex.IncrScenarioIter, mex.logger),
}
*runState.maxVUs = startMaxVUs
if err = runState.retrieveStartMaxVUs(); err != nil {
Expand All @@ -542,6 +542,7 @@ func (mex *ExternallyControlled) Run(parentCtx context.Context, out chan<- stats
Executor: mex.config.Type,
StartTime: time.Now(),
ProgressFn: runState.progressFn,
GetIter: mex.GetScenarioIter,
})

mex.progress.Modify(pb.WithProgress(runState.progressFn)) // Keep track of the progress
Expand Down
3 changes: 2 additions & 1 deletion lib/executor/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func validateStages(stages []Stage) []error {
//
// TODO: emit the end-of-test iteration metrics here (https://github.com/loadimpact/k6/issues/1250)
func getIterationRunner(
executionState *lib.ExecutionState, logger *logrus.Entry,
executionState *lib.ExecutionState, incrScenarioIter func(), logger *logrus.Entry,
) func(context.Context, lib.ActiveVU) bool {
return func(ctx context.Context, vu lib.ActiveVU) bool {
err := vu.RunOnce()
Expand Down Expand Up @@ -108,6 +108,7 @@ func getIterationRunner(

// TODO: move emission of end-of-iteration metrics here?
executionState.AddFullIterations(1)
incrScenarioIter()
return true
}
}
Expand Down
3 changes: 2 additions & 1 deletion lib/executor/per_vu_iterations.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,14 @@ func (pvi PerVUIterations) Run(parentCtx context.Context, out chan<- stats.Sampl
defer activeVUs.Wait()

regDurationDone := regDurationCtx.Done()
runIteration := getIterationRunner(pvi.executionState, pvi.logger)
runIteration := getIterationRunner(pvi.executionState, pvi.IncrScenarioIter, pvi.logger)

maxDurationCtx = lib.WithScenarioState(maxDurationCtx, &lib.ScenarioState{
Name: pvi.config.Name,
Executor: pvi.config.Type,
StartTime: startTime,
ProgressFn: progressFn,
GetIter: pvi.GetScenarioIter,
})

activationParams := getVUActivationParams(maxDurationCtx, pvi.config.BaseConfig,
Expand Down
3 changes: 2 additions & 1 deletion lib/executor/ramping_arrival_rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ func (varr RampingArrivalRate) Run(parentCtx context.Context, out chan<- stats.S
Executor: varr.config.Type,
StartTime: startTime,
ProgressFn: progressFn,
GetIter: varr.GetScenarioIter,
})

activationParams := getVUActivationParams(maxDurationCtx, varr.config.BaseConfig,
Expand Down Expand Up @@ -415,7 +416,7 @@ func (varr RampingArrivalRate) Run(parentCtx context.Context, out chan<- stats.S
}

regDurationDone := regDurationCtx.Done()
runIterationBasic := getIterationRunner(varr.executionState, varr.logger)
runIterationBasic := getIterationRunner(varr.executionState, varr.IncrScenarioIter, varr.logger)
runIteration := func(vu lib.ActiveVU) {
runIterationBasic(maxDurationCtx, vu)
activeVUs <- vu
Expand Down
3 changes: 2 additions & 1 deletion lib/executor/ramping_vus.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ func (vlv RampingVUs) Run(parentCtx context.Context, out chan<- stats.SampleCont

// Actually schedule the VUs and iterations, likely the most complicated
// executor among all of them...
runIteration := getIterationRunner(vlv.executionState, vlv.logger)
runIteration := getIterationRunner(vlv.executionState, vlv.IncrScenarioIter, vlv.logger)
getVU := func() (lib.InitializedVU, error) {
initVU, err := vlv.executionState.GetPlannedVU(vlv.logger, false)
if err != nil {
Expand All @@ -629,6 +629,7 @@ func (vlv RampingVUs) Run(parentCtx context.Context, out chan<- stats.SampleCont
Executor: vlv.config.Type,
StartTime: startTime,
ProgressFn: progressFn,
GetIter: vlv.GetScenarioIter,
})
vuHandles := make([]*vuHandle, maxVUs)
for i := uint64(0); i < maxVUs; i++ {
Expand Down
3 changes: 2 additions & 1 deletion lib/executor/shared_iterations.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,14 @@ func (si SharedIterations) Run(parentCtx context.Context, out chan<- stats.Sampl
}()

regDurationDone := regDurationCtx.Done()
runIteration := getIterationRunner(si.executionState, si.logger)
runIteration := getIterationRunner(si.executionState, si.IncrScenarioIter, si.logger)

maxDurationCtx = lib.WithScenarioState(maxDurationCtx, &lib.ScenarioState{
Name: si.config.Name,
Executor: si.config.Type,
StartTime: startTime,
ProgressFn: progressFn,
GetIter: si.GetScenarioIter,
})

activationParams := getVUActivationParams(maxDurationCtx, si.config.BaseConfig,
Expand Down
1 change: 1 addition & 0 deletions lib/executors.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ type ScenarioState struct {
Name, Executor string
StartTime time.Time
ProgressFn func() (float64, []string)
GetIter func() uint64
}

// InitVUFunc is just a shorthand so we don't have to type the function
Expand Down

0 comments on commit 555e1eb

Please sign in to comment.