diff --git a/api/v1/status_routes_test.go b/api/v1/status_routes_test.go index ba2f174e391..b53d1ee89f5 100644 --- a/api/v1/status_routes_test.go +++ b/api/v1/status_routes_test.go @@ -101,6 +101,7 @@ func TestPatchStatus(t *testing.T) { } for name, testCase := range testData { + name, testCase := name, testCase t.Run(name, func(t *testing.T) { t.Parallel() @@ -136,6 +137,9 @@ func TestPatchStatus(t *testing.T) { RunState: testState, } + stopEmission, err := execScheduler.Init(runCtx, samples) + require.NoError(t, err) + wg := &sync.WaitGroup{} wg.Add(1) defer func() { @@ -146,6 +150,7 @@ func TestPatchStatus(t *testing.T) { go func() { assert.ErrorContains(t, execScheduler.Run(globalCtx, runCtx, samples), "custom cancel signal") + stopEmission() close(samples) wg.Done() }() diff --git a/cmd/run.go b/cmd/run.go index d1338ba1d94..c9029950223 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -274,6 +274,13 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { stopSignalHandling := handleTestAbortSignals(c.gs, gracefulStop, onHardStop) defer stopSignalHandling() + // Initialize the VUs and executors + stopVUEmission, err := execScheduler.Init(runCtx, samples) + if err != nil { + return err + } + defer stopVUEmission() + if conf.Linger.Bool { defer func() { msg := "The test is done, but --linger was enabled, so k6 is waiting for Ctrl+C to continue..." @@ -291,8 +298,8 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { }() } - // Initialize VUs and start the test! However, we won't immediately return - // if there was an error, we still have things to do. + // Start the test! However, we won't immediately return if there was an + // error, we still have things to do. err = execScheduler.Run(globalCtx, runCtx, samples) // Init has passed successfully, so unless disabled, make sure we send a diff --git a/cmd/tests/cmd_run_test.go b/cmd/tests/cmd_run_test.go index 79bbc1c1d77..d409e8668e7 100644 --- a/cmd/tests/cmd_run_test.go +++ b/cmd/tests/cmd_run_test.go @@ -1020,15 +1020,7 @@ func TestAbortedByTestAbortInNonFirstInitCode(t *testing.T) { export function handleSummary() { return {stdout: '\n\n\nbogus summary\n\n\n'};} ` - t.Run("noLinger", func(t *testing.T) { - t.Parallel() - testAbortedByScriptTestAbort(t, script, runTestWithNoLinger) - }) - - t.Run("withLinger", func(t *testing.T) { - t.Parallel() - testAbortedByScriptTestAbort(t, script, runTestWithLinger) - }) + testAbortedByScriptTestAbort(t, script, runTestWithNoLinger) } func TestAbortedByScriptAbortInVUCode(t *testing.T) { diff --git a/execution/scheduler.go b/execution/scheduler.go index 5cb724351ff..8a5f18bef58 100644 --- a/execution/scheduler.go +++ b/execution/scheduler.go @@ -372,6 +372,36 @@ func (e *Scheduler) runExecutor( runResults <- err } +// Init concurrently initializes all of the planned VUs and then sequentially +// initializes all of the configured executors. It also starts the measurement +// and emission of the `vus` and `vus_max` metrics. +func (e *Scheduler) Init( + runCtx context.Context, samplesOut chan<- metrics.SampleContainer, +) (stopVUEmission func(), err error) { + logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-init") + + execSchedRunCtx, execSchedRunCancel := context.WithCancel(runCtx) + waitForVUsMetricPush := e.emitVUsAndVUsMax(execSchedRunCtx, samplesOut) + stopVUEmission = func() { + logger.Debugf("Stopping vus and vux_max metrics emission...") + execSchedRunCancel() + waitForVUsMetricPush() + } + + defer func() { + if interruptErr := GetCancelReasonIfTestAborted(runCtx); interruptErr != nil { + logger.Debugf("The test run was interrupted, returning '%s' instead of '%s'", interruptErr, err) + e.state.SetExecutionStatus(lib.ExecutionStatusInterrupted) + err = interruptErr + } + if err != nil { + stopVUEmission() + } + }() + + return stopVUEmission, e.initVUsAndExecutors(execSchedRunCtx, samplesOut) +} + // Run the Scheduler, funneling all generated metric samples through the supplied // out channel. // @@ -379,11 +409,6 @@ func (e *Scheduler) runExecutor( func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- metrics.SampleContainer) (err error) { logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-run") - execSchedRunCtx, execSchedRunCancel := context.WithCancel(runCtx) - waitForVUsMetricPush := e.emitVUsAndVUsMax(execSchedRunCtx, samplesOut) - defer waitForVUsMetricPush() - defer execSchedRunCancel() - defer func() { if interruptErr := GetCancelReasonIfTestAborted(runCtx); interruptErr != nil { logger.Debugf("The test run was interrupted, returning '%s' instead of '%s'", interruptErr, err) @@ -392,10 +417,6 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met } }() - if err := e.initVUsAndExecutors(execSchedRunCtx, samplesOut); err != nil { - return err - } - e.initProgress.Modify(pb.WithConstLeft("Run")) if e.state.IsPaused() { logger.Debug("Execution is paused, waiting for resume or interrupt...") @@ -404,7 +425,7 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met select { case <-e.state.ResumeNotify(): // continue - case <-execSchedRunCtx.Done(): + case <-runCtx.Done(): return nil } } @@ -422,7 +443,7 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met // TODO: get rid of this context, pass the e.state directly to VUs when they // are initialized by e.initVUsAndExecutors(). This will also give access to // its properties in their init context executions. - withExecStateCtx := lib.WithExecutionState(execSchedRunCtx, e.state) + withExecStateCtx := lib.WithExecutionState(runCtx, e.state) // Run setup() before any executors, if it's not disabled if !e.state.Test.Options.NoSetup.Bool { diff --git a/execution/scheduler_ext_test.go b/execution/scheduler_ext_test.go index 6e9f37c048c..2bb10fa11e2 100644 --- a/execution/scheduler_ext_test.go +++ b/execution/scheduler_ext_test.go @@ -88,6 +88,13 @@ func newTestScheduler( } }() + stopEmission, err := execScheduler.Init(ctx, samples) + require.NoError(t, err) + t.Cleanup(func() { + stopEmission() + close(samples) + }) + return ctx, cancel, execScheduler, samples } @@ -143,6 +150,12 @@ func TestSchedulerRunNonDefault(t *testing.T) { done := make(chan struct{}) samples := make(chan metrics.SampleContainer) + defer close(samples) + + stopEmission, err := execScheduler.Init(ctx, samples) + require.NoError(t, err) + defer stopEmission() + go func() { assert.NoError(t, execScheduler.Run(ctx, ctx, samples)) close(done) @@ -254,6 +267,12 @@ func TestSchedulerRunEnv(t *testing.T) { done := make(chan struct{}) samples := make(chan metrics.SampleContainer) + defer close(samples) + + stopEmission, err := execScheduler.Init(ctx, samples) + require.NoError(t, err) + defer stopEmission() + go func() { assert.NoError(t, execScheduler.Run(ctx, ctx, samples)) close(done) @@ -321,6 +340,12 @@ func TestSchedulerSystemTags(t *testing.T) { defer cancel() samples := make(chan metrics.SampleContainer) + defer close(samples) + + stopEmission, err := execScheduler.Init(ctx, samples) + require.NoError(t, err) + defer stopEmission() + done := make(chan struct{}) go func() { defer close(done) @@ -452,6 +477,12 @@ func TestSchedulerRunCustomTags(t *testing.T) { done := make(chan struct{}) samples := make(chan metrics.SampleContainer) + defer close(samples) + + stopEmission, err := execScheduler.Init(ctx, samples) + require.NoError(t, err) + defer stopEmission() + go func() { defer close(done) require.NoError(t, execScheduler.Run(ctx, ctx, samples)) @@ -614,8 +645,13 @@ func TestSchedulerRunCustomConfigNoCrossover(t *testing.T) { defer cancel() samples := make(chan metrics.SampleContainer) + + stopEmission, err := execScheduler.Init(ctx, samples) + require.NoError(t, err) + go func() { assert.NoError(t, execScheduler.Run(ctx, ctx, samples)) + stopEmission() close(samples) }() @@ -947,6 +983,12 @@ func TestSchedulerEndIterations(t *testing.T) { require.NoError(t, err) samples := make(chan metrics.SampleContainer, 300) + defer close(samples) + + stopEmission, err := execScheduler.Init(ctx, samples) + require.NoError(t, err) + defer stopEmission() + require.NoError(t, execScheduler.Run(ctx, ctx, samples)) assert.Equal(t, uint64(100), execScheduler.GetState().GetFullIterationCount()) @@ -1155,9 +1197,15 @@ func TestRealTimeAndSetupTeardownMetrics(t *testing.T) { defer cancel() done := make(chan struct{}) - sampleContainers := make(chan metrics.SampleContainer) + samples := make(chan metrics.SampleContainer) + defer close(samples) + + stopEmission, err := execScheduler.Init(ctx, samples) + require.NoError(t, err) + defer stopEmission() + go func() { - assert.NoError(t, execScheduler.Run(ctx, ctx, sampleContainers)) + assert.NoError(t, execScheduler.Run(ctx, ctx, samples)) close(done) }() @@ -1167,7 +1215,7 @@ func TestRealTimeAndSetupTeardownMetrics(t *testing.T) { to *= time.Millisecond for { select { - case sampleContainer := <-sampleContainers: + case sampleContainer := <-samples: gotVus := false for _, s := range sampleContainer.GetSamples() { if s.Metric == piState.BuiltinMetrics.VUs || s.Metric == piState.BuiltinMetrics.VUsMax { @@ -1257,7 +1305,7 @@ func TestRealTimeAndSetupTeardownMetrics(t *testing.T) { for { select { - case s := <-sampleContainers: + case s := <-samples: t.Fatalf("Did not expect anything in the sample channel bug got %#v", s) case <-time.After(3 * time.Second): t.Fatalf("Local execScheduler took way to long to finish") diff --git a/js/runner_test.go b/js/runner_test.go index ec9ab13bdd6..02ebb28d2c6 100644 --- a/js/runner_test.go +++ b/js/runner_test.go @@ -406,6 +406,9 @@ func TestDataIsolation(t *testing.T) { require.Empty(t, runner.defaultGroup.Groups) + stopEmission, err := execScheduler.Init(runCtx, samples) + require.NoError(t, err) + errC := make(chan error) go func() { errC <- execScheduler.Run(globalCtx, runCtx, samples) }() @@ -414,6 +417,7 @@ func TestDataIsolation(t *testing.T) { runAbort(fmt.Errorf("unexpected abort")) t.Fatal("Test timed out") case err := <-errC: + stopEmission() close(samples) require.NoError(t, err) waitForMetricsFlushed()