Skip to content

Commit

Permalink
Split Scheduler.Run() into Init() and Run() again
Browse files Browse the repository at this point in the history
This reverts a big part of #2885 because with all of the logic in Run(), k6 didn't know if it needed to wait when --linger was specified.

In general, if a script error or test.abort() occurs during init, --linger should not apply and k6 should exit immediately. But if test.abort() is called during the test run itself, or the test was stopped in some other way besides Ctrl+C, --linger means that k6 should not exit immediately after stopping the test.
  • Loading branch information
na-- committed Feb 2, 2023
1 parent 27b1a4d commit 94f88eb
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 26 deletions.
5 changes: 5 additions & 0 deletions api/v1/status_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func TestPatchStatus(t *testing.T) {
}

for name, testCase := range testData {
name, testCase := name, testCase
t.Run(name, func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -136,6 +137,9 @@ func TestPatchStatus(t *testing.T) {
RunState: testState,
}

stopEmission, err := execScheduler.Init(runCtx, samples)
require.NoError(t, err)

wg := &sync.WaitGroup{}
wg.Add(1)
defer func() {
Expand All @@ -146,6 +150,7 @@ func TestPatchStatus(t *testing.T) {

go func() {
assert.ErrorContains(t, execScheduler.Run(globalCtx, runCtx, samples), "custom cancel signal")
stopEmission()
close(samples)
wg.Done()
}()
Expand Down
11 changes: 9 additions & 2 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,13 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
stopSignalHandling := handleTestAbortSignals(c.gs, gracefulStop, onHardStop)
defer stopSignalHandling()

// Initialize the VUs and executors
stopVUEmission, err := execScheduler.Init(runCtx, samples)
if err != nil {
return err
}
defer stopVUEmission()

if conf.Linger.Bool {
defer func() {
msg := "The test is done, but --linger was enabled, so k6 is waiting for Ctrl+C to continue..."
Expand All @@ -291,8 +298,8 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
}()
}

// Initialize VUs and start the test! However, we won't immediately return
// if there was an error, we still have things to do.
// Start the test! However, we won't immediately return if there was an
// error, we still have things to do.
err = execScheduler.Run(globalCtx, runCtx, samples)

// Init has passed successfully, so unless disabled, make sure we send a
Expand Down
10 changes: 1 addition & 9 deletions cmd/tests/cmd_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1020,15 +1020,7 @@ func TestAbortedByTestAbortInNonFirstInitCode(t *testing.T) {
export function handleSummary() { return {stdout: '\n\n\nbogus summary\n\n\n'};}
`

t.Run("noLinger", func(t *testing.T) {
t.Parallel()
testAbortedByScriptTestAbort(t, script, runTestWithNoLinger)
})

t.Run("withLinger", func(t *testing.T) {
t.Parallel()
testAbortedByScriptTestAbort(t, script, runTestWithLinger)
})
testAbortedByScriptTestAbort(t, script, runTestWithNoLinger)
}

func TestAbortedByScriptAbortInVUCode(t *testing.T) {
Expand Down
43 changes: 32 additions & 11 deletions execution/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,18 +372,43 @@ func (e *Scheduler) runExecutor(
runResults <- err
}

// Init concurrently initializes all of the planned VUs and then sequentially
// initializes all of the configured executors. It also starts the measurement
// and emission of the `vus` and `vus_max` metrics.
func (e *Scheduler) Init(
runCtx context.Context, samplesOut chan<- metrics.SampleContainer,
) (stopVUEmission func(), err error) {
logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-init")

execSchedRunCtx, execSchedRunCancel := context.WithCancel(runCtx)
waitForVUsMetricPush := e.emitVUsAndVUsMax(execSchedRunCtx, samplesOut)
stopVUEmission = func() {
logger.Debugf("Stopping vus and vux_max metrics emission...")
execSchedRunCancel()
waitForVUsMetricPush()
}

defer func() {
if interruptErr := GetCancelReasonIfTestAborted(runCtx); interruptErr != nil {
logger.Debugf("The test run was interrupted, returning '%s' instead of '%s'", interruptErr, err)
e.state.SetExecutionStatus(lib.ExecutionStatusInterrupted)
err = interruptErr
}
if err != nil {
stopVUEmission()
}
}()

return stopVUEmission, e.initVUsAndExecutors(execSchedRunCtx, samplesOut)
}

// Run the Scheduler, funneling all generated metric samples through the supplied
// out channel.
//
//nolint:funlen
func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- metrics.SampleContainer) (err error) {
logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-run")

execSchedRunCtx, execSchedRunCancel := context.WithCancel(runCtx)
waitForVUsMetricPush := e.emitVUsAndVUsMax(execSchedRunCtx, samplesOut)
defer waitForVUsMetricPush()
defer execSchedRunCancel()

defer func() {
if interruptErr := GetCancelReasonIfTestAborted(runCtx); interruptErr != nil {
logger.Debugf("The test run was interrupted, returning '%s' instead of '%s'", interruptErr, err)
Expand All @@ -392,10 +417,6 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met
}
}()

if err := e.initVUsAndExecutors(execSchedRunCtx, samplesOut); err != nil {
return err
}

e.initProgress.Modify(pb.WithConstLeft("Run"))
if e.state.IsPaused() {
logger.Debug("Execution is paused, waiting for resume or interrupt...")
Expand All @@ -404,7 +425,7 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met
select {
case <-e.state.ResumeNotify():
// continue
case <-execSchedRunCtx.Done():
case <-runCtx.Done():
return nil
}
}
Expand All @@ -422,7 +443,7 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met
// TODO: get rid of this context, pass the e.state directly to VUs when they
// are initialized by e.initVUsAndExecutors(). This will also give access to
// its properties in their init context executions.
withExecStateCtx := lib.WithExecutionState(execSchedRunCtx, e.state)
withExecStateCtx := lib.WithExecutionState(runCtx, e.state)

// Run setup() before any executors, if it's not disabled
if !e.state.Test.Options.NoSetup.Bool {
Expand Down
56 changes: 52 additions & 4 deletions execution/scheduler_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ func newTestScheduler(
}
}()

stopEmission, err := execScheduler.Init(ctx, samples)
require.NoError(t, err)
t.Cleanup(func() {
stopEmission()
close(samples)
})

return ctx, cancel, execScheduler, samples
}

Expand Down Expand Up @@ -143,6 +150,12 @@ func TestSchedulerRunNonDefault(t *testing.T) {

done := make(chan struct{})
samples := make(chan metrics.SampleContainer)
defer close(samples)

stopEmission, err := execScheduler.Init(ctx, samples)
require.NoError(t, err)
defer stopEmission()

go func() {
assert.NoError(t, execScheduler.Run(ctx, ctx, samples))
close(done)
Expand Down Expand Up @@ -254,6 +267,12 @@ func TestSchedulerRunEnv(t *testing.T) {

done := make(chan struct{})
samples := make(chan metrics.SampleContainer)
defer close(samples)

stopEmission, err := execScheduler.Init(ctx, samples)
require.NoError(t, err)
defer stopEmission()

go func() {
assert.NoError(t, execScheduler.Run(ctx, ctx, samples))
close(done)
Expand Down Expand Up @@ -321,6 +340,12 @@ func TestSchedulerSystemTags(t *testing.T) {
defer cancel()

samples := make(chan metrics.SampleContainer)
defer close(samples)

stopEmission, err := execScheduler.Init(ctx, samples)
require.NoError(t, err)
defer stopEmission()

done := make(chan struct{})
go func() {
defer close(done)
Expand Down Expand Up @@ -452,6 +477,12 @@ func TestSchedulerRunCustomTags(t *testing.T) {

done := make(chan struct{})
samples := make(chan metrics.SampleContainer)
defer close(samples)

stopEmission, err := execScheduler.Init(ctx, samples)
require.NoError(t, err)
defer stopEmission()

go func() {
defer close(done)
require.NoError(t, execScheduler.Run(ctx, ctx, samples))
Expand Down Expand Up @@ -614,8 +645,13 @@ func TestSchedulerRunCustomConfigNoCrossover(t *testing.T) {
defer cancel()

samples := make(chan metrics.SampleContainer)

stopEmission, err := execScheduler.Init(ctx, samples)
require.NoError(t, err)

go func() {
assert.NoError(t, execScheduler.Run(ctx, ctx, samples))
stopEmission()
close(samples)
}()

Expand Down Expand Up @@ -947,6 +983,12 @@ func TestSchedulerEndIterations(t *testing.T) {
require.NoError(t, err)

samples := make(chan metrics.SampleContainer, 300)
defer close(samples)

stopEmission, err := execScheduler.Init(ctx, samples)
require.NoError(t, err)
defer stopEmission()

require.NoError(t, execScheduler.Run(ctx, ctx, samples))

assert.Equal(t, uint64(100), execScheduler.GetState().GetFullIterationCount())
Expand Down Expand Up @@ -1155,9 +1197,15 @@ func TestRealTimeAndSetupTeardownMetrics(t *testing.T) {
defer cancel()

done := make(chan struct{})
sampleContainers := make(chan metrics.SampleContainer)
samples := make(chan metrics.SampleContainer)
defer close(samples)

stopEmission, err := execScheduler.Init(ctx, samples)
require.NoError(t, err)
defer stopEmission()

go func() {
assert.NoError(t, execScheduler.Run(ctx, ctx, sampleContainers))
assert.NoError(t, execScheduler.Run(ctx, ctx, samples))
close(done)
}()

Expand All @@ -1167,7 +1215,7 @@ func TestRealTimeAndSetupTeardownMetrics(t *testing.T) {
to *= time.Millisecond
for {
select {
case sampleContainer := <-sampleContainers:
case sampleContainer := <-samples:
gotVus := false
for _, s := range sampleContainer.GetSamples() {
if s.Metric == piState.BuiltinMetrics.VUs || s.Metric == piState.BuiltinMetrics.VUsMax {
Expand Down Expand Up @@ -1257,7 +1305,7 @@ func TestRealTimeAndSetupTeardownMetrics(t *testing.T) {

for {
select {
case s := <-sampleContainers:
case s := <-samples:
t.Fatalf("Did not expect anything in the sample channel bug got %#v", s)
case <-time.After(3 * time.Second):
t.Fatalf("Local execScheduler took way to long to finish")
Expand Down
4 changes: 4 additions & 0 deletions js/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,9 @@ func TestDataIsolation(t *testing.T) {

require.Empty(t, runner.defaultGroup.Groups)

stopEmission, err := execScheduler.Init(runCtx, samples)
require.NoError(t, err)

errC := make(chan error)
go func() { errC <- execScheduler.Run(globalCtx, runCtx, samples) }()

Expand All @@ -414,6 +417,7 @@ func TestDataIsolation(t *testing.T) {
runAbort(fmt.Errorf("unexpected abort"))
t.Fatal("Test timed out")
case err := <-errC:
stopEmission()
close(samples)
require.NoError(t, err)
waitForMetricsFlushed()
Expand Down

0 comments on commit 94f88eb

Please sign in to comment.