From 15cd3fc3cef4c65d8517a556ff4d1dc0d4fa6d48 Mon Sep 17 00:00:00 2001 From: Mathew Merrick Date: Tue, 23 Jan 2024 01:06:53 +0000 Subject: [PATCH] run jobs in background --- .../types/examples/background_test.go | 102 +++++++++++++++ test/e2e/framework/types/job.go | 119 +++++++++++++----- test/e2e/framework/types/step.go | 12 +- test/e2e/framework/types/step_sleep.go | 4 + test/e2e/framework/types/step_stop.go | 32 +++++ 5 files changed, 238 insertions(+), 31 deletions(-) create mode 100644 test/e2e/framework/types/examples/background_test.go create mode 100644 test/e2e/framework/types/step_stop.go diff --git a/test/e2e/framework/types/examples/background_test.go b/test/e2e/framework/types/examples/background_test.go new file mode 100644 index 0000000000..3abc64a3ef --- /dev/null +++ b/test/e2e/framework/types/examples/background_test.go @@ -0,0 +1,102 @@ +package types + +import ( + "fmt" + "log" + "sync" + "testing" + "time" + + "github.com/Azure/azure-container-networking/test/e2e/framework/types" +) + +func TestFramework(t *testing.T) { + job := types.NewJob("Validate that drop metrics are present in the prometheus endpoint") + runner := types.NewRunner(t, job) + defer runner.Run() + + job.AddStep(&TestBackground{ + CounterName: "Example Counter", + }, &types.StepOptions{ + ExpectError: false, + RunInBackgroundWithID: "TestStep", + }) + + job.AddStep(&types.Sleep{ + Duration: 1 * time.Second, + }, nil) + + job.AddStep(&types.Stop{ + BackgroundID: "TestStep", + }, nil) +} + +type TestBackground struct { + CounterName string + c *counter +} + +func (t *TestBackground) Run() error { + t.c = newCounter() + err := t.c.Start() + if err != nil { + return fmt.Errorf("failed to start counter: %w", err) + } + log.Println("running counter: " + t.CounterName) + return nil +} + +func (t *TestBackground) Stop() error { + log.Println("stopping counter: " + t.CounterName) + err := t.c.Stop() + if err != nil { + return fmt.Errorf("failed to stop counter: %w", err) + } + log.Println("count:", t.c.count) + return nil +} + +func (t *TestBackground) Prevalidate() error { + return nil +} + +func (t *TestBackground) Postvalidate() error { + return nil +} + +type counter struct { + ticker *time.Ticker + count int + ch chan struct{} + wg sync.WaitGroup +} + +func newCounter() *counter { + return &counter{ + ch: make(chan struct{}), + } +} + +func (c *counter) Start() error { + c.ticker = time.NewTicker(1 * time.Millisecond) + c.wg.Add(1) + go func() { + for { + select { + case <-c.ticker.C: + c.count++ + case <-c.ch: + c.wg.Done() + return + } + } + }() + + return nil +} + +func (c *counter) Stop() error { + close(c.ch) + c.wg.Wait() + return nil +} diff --git a/test/e2e/framework/types/job.go b/test/e2e/framework/types/job.go index eef5bb0b78..ae22ee0d74 100644 --- a/test/e2e/framework/types/job.go +++ b/test/e2e/framework/types/job.go @@ -12,12 +12,15 @@ var ( ErrNilError = fmt.Errorf("expected error to be nil") ErrMissingParameter = fmt.Errorf("missing parameter") ErrParameterAlreadySet = fmt.Errorf("parameter already set") + ErrOrphanSteps = fmt.Errorf("background steps with no corresponding stop") + ErrCannotStopStep = fmt.Errorf("cannot stop step") ) type Job struct { - Values *JobValues - Description string - Steps []*StepWrapper + Values *JobValues + Description string + Steps []*StepWrapper + BackgroundSteps map[string]*StepWrapper } type StepWrapper struct { @@ -45,7 +48,8 @@ func NewJob(description string) *Job { Values: &JobValues{ kv: make(map[string]string), }, - Description: description, + BackgroundSteps: make(map[string]*StepWrapper), + Description: description, } } @@ -56,10 +60,11 @@ func (j *Job) AddScenario(steps ...StepWrapper) { } func (j *Job) AddStep(step Step, opts *StepOptions) { - j.Steps = append(j.Steps, &StepWrapper{ + stepw := &StepWrapper{ Step: step, Opts: opts, - }) + } + j.Steps = append(j.Steps, stepw) } func (j *Job) Run() error { @@ -67,6 +72,7 @@ func (j *Job) Run() error { return ErrEmptyDescription } + // validate all steps in the job, making sure parameters are set/validated etc. err := j.Validate() if err != nil { return err // nolint:wrapcheck // don't wrap error, wouldn't provide any more context than the error itself @@ -100,11 +106,58 @@ func (j *Job) Run() error { } func (j *Job) Validate() error { + // ensure that there are no background steps left after running + for _, wrapper := range j.Steps { err := j.validateStep(wrapper) if err != nil { return err } + + } + + err := j.validateBackgroundSteps() + if err != nil { + return err + } + + return nil +} + +func (j *Job) validateBackgroundSteps() error { + stoppedBackgroundSteps := make(map[string]bool) + + for _, stepw := range j.Steps { + switch s := stepw.Step.(type) { + case *Stop: + if j.BackgroundSteps[s.BackgroundID] == nil { + return fmt.Errorf("cannot stop step %s, as it won't be started by this time; %w", s.BackgroundID, ErrCannotStopStep) + } + if stopped := stoppedBackgroundSteps[s.BackgroundID]; stopped { + return fmt.Errorf("cannot stop step %s, as it has already been stopped; %w", s.BackgroundID, ErrCannotStopStep) + } + + // track for later on if the stop step is called + stoppedBackgroundSteps[s.BackgroundID] = true + + // set the stop step within the step + s.Step = j.BackgroundSteps[s.BackgroundID].Step + + default: + if stepw.Opts.RunInBackgroundWithID != "" { + if _, exists := j.BackgroundSteps[stepw.Opts.RunInBackgroundWithID]; exists { + log.Fatalf("step with id %s already exists", stepw.Opts.RunInBackgroundWithID) + } + j.BackgroundSteps[stepw.Opts.RunInBackgroundWithID] = stepw + stoppedBackgroundSteps[stepw.Opts.RunInBackgroundWithID] = false + } + } + } + + for stepName, stopped := range stoppedBackgroundSteps { + if !stopped { + return fmt.Errorf("step %s was not stopped; %w", stepName, ErrOrphanSteps) + } } return nil @@ -119,41 +172,51 @@ func (j *Job) validateStep(stepw *StepWrapper) error { stepw.Opts = &DefaultOpts } - for i, f := range reflect.VisibleFields(val.Type()) { + switch stepw.Step.(type) { + case *Stop: + // don't validate stop steps + return nil - // skip saving unexported fields - if !f.IsExported() { - continue - } + case *Sleep: + // don't validate sleep steps + return nil + + default: + for i, f := range reflect.VisibleFields(val.Type()) { - k := reflect.Indirect(val.Field(i)).Kind() + // skip saving unexported fields + if !f.IsExported() { + continue + } - if k == reflect.String { - parameter := val.Type().Field(i).Name - value := val.Field(i).Interface().(string) - storedValue := j.Values.Get(parameter) + k := reflect.Indirect(val.Field(i)).Kind() + + if k == reflect.String { + parameter := val.Type().Field(i).Name + value := val.Field(i).Interface().(string) + storedValue := j.Values.Get(parameter) + + if storedValue == "" { + if value != "" { - if storedValue == "" { - if value != "" { - if stepw.Opts.SaveParametersToJob { fmt.Printf("%s setting parameter %s in job context to %s\n", stepName, parameter, value) j.Values.Set(parameter, value) + + } else { + return fmt.Errorf("missing parameter %s for step %s: %w", parameter, stepName, ErrMissingParameter) } continue } - return fmt.Errorf("missing parameter %s for step %s: %w", parameter, stepName, ErrMissingParameter) - } + if value != "" { + return fmt.Errorf("parameter %s for step %s is already set from previous step: %w", parameter, stepName, ErrParameterAlreadySet) + } - if value != "" { - return fmt.Errorf("parameter %s for step %s is already set from previous step: %w", parameter, stepName, ErrParameterAlreadySet) + // don't use log format since this is technically preexecution and easier to read + fmt.Println(stepName, "using previously stored value for parameter", parameter, "set as", j.Values.Get(parameter)) + val.Field(i).SetString(storedValue) } - - // don't use log format since this is technically preexecution and easier to read - fmt.Println(stepName, "using previously stored value for parameter", parameter, "set as", j.Values.Get(parameter)) - val.Field(i).SetString(storedValue) } } - return nil } diff --git a/test/e2e/framework/types/step.go b/test/e2e/framework/types/step.go index 096de78970..fc82cb1ae2 100644 --- a/test/e2e/framework/types/step.go +++ b/test/e2e/framework/types/step.go @@ -1,14 +1,15 @@ package types var DefaultOpts = StepOptions{ - ExpectError: false, - SaveParametersToJob: true, + ExpectError: false, + SkipSavingParamatersToJob: false, } type Step interface { Prevalidate() error Run() error Postvalidate() error + Stop() error } type StepOptions struct { @@ -18,5 +19,10 @@ type StepOptions struct { // a step, but you don't want to save the parameters // ex: Sleep for 15 seconds, then Sleep for 10 seconds, // you don't want to save the parameters - SaveParametersToJob bool + SkipSavingParamatersToJob bool + + // Will save this step to the job's steps + // and then later on when Stop is called with job name, + // it will call Stop() on the step + RunInBackgroundWithID string } diff --git a/test/e2e/framework/types/step_sleep.go b/test/e2e/framework/types/step_sleep.go index b65f7bfeaf..1751fe7367 100644 --- a/test/e2e/framework/types/step_sleep.go +++ b/test/e2e/framework/types/step_sleep.go @@ -15,6 +15,10 @@ func (c *Sleep) Run() error { return nil } +func (c *Sleep) Stop() error { + return nil +} + func (c *Sleep) Prevalidate() error { return nil } diff --git a/test/e2e/framework/types/step_stop.go b/test/e2e/framework/types/step_stop.go new file mode 100644 index 0000000000..9a7ee3f151 --- /dev/null +++ b/test/e2e/framework/types/step_stop.go @@ -0,0 +1,32 @@ +package types + +import ( + "fmt" + "reflect" +) + +type Stop struct { + BackgroundID string + Step Step +} + +func (c *Stop) Run() error { + err := c.Step.Stop() + if err != nil { + stepName := reflect.TypeOf(c.Step).Elem().Name() + return fmt.Errorf("failed to stop step: %s with err %w", stepName, err) + } + return nil +} + +func (c *Stop) Stop() error { + return nil +} + +func (c *Stop) Prevalidate() error { + return nil +} + +func (c *Stop) Postvalidate() error { + return nil +}