Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for the Cloud output #86

Merged
merged 15 commits into from
May 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,52 @@ Defines options for the starter pod. This includes:
* passing in custom image
* passing in labels and annotations

### k6 outputs

#### k6 Cloud output

k6 supports [output to its Cloud](https://k6.io/docs/results-visualization/cloud) with `k6 run --out cloud script.js` command. This feature is available in k6-operator as well for subscribed users. Note that it supports only `parallelism: 20` or less.

To use this option in k6-operator, set the argument in yaml:

```yaml
...
script:
configMap:
name: "<configmap>"
arguments: --out cloud
...
```

Then uncomment cloud output section in `config/default/kustomization.yaml` and copy your token from the Cloud there:

```yaml
# Uncomment this section if you need cloud output and copy-paste your token
secretGenerator:
- name: cloud-token
literals:
- token=<copy-paste-token-string-here>
options:
annotations:
kubernetes.io/service-account.name: k6-operator-controller
labels:
k6cloud: token
```

This is sufficient to run k6 with the Cloud output and default values of `projectID` and `name` (`"k6-operator-test"`). For non-default values, extended script options can be used like this:

```js
export let options = {
...
ext: {
loadimpact: {
name: 'Configured k6-operator test',
projectID: 1234567,
}
}
};
```

### Cleaning up between test runs
After completing a test run, you need to clean up the test jobs created. This is done by running the following command:
```bash
Expand Down
2 changes: 1 addition & 1 deletion api/v1alpha1/k6_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ type K6Configmap struct {
type Cleanup string

// Stage describes which stage of the test execution lifecycle our runners are in
// +kubebuilder:validation:Enum=created;started;finished
// +kubebuilder:validation:Enum=initialization;initialized;created;started;finished
type Stage string

// K6Status defines the observed state of K6
Expand Down
2 changes: 2 additions & 0 deletions config/crd/bases/k6.io_k6s.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1956,6 +1956,8 @@ spec:
description: Stage describes which stage of the test execution lifecycle
our runners are in
enum:
- initialization
- initialized
- created
- started
- finished
Expand Down
11 changes: 11 additions & 0 deletions config/default/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,14 @@ vars:
# kind: Service
# version: v1
# name: webhook-service

# Uncomment this section if you need cloud output and copy-paste your token
# secretGenerator:
# - name: cloud-token
# literals:
# - token=
# options:
# annotations:
# kubernetes.io/service-account.name: k6-operator-controller
# labels:
# k6cloud: token
9 changes: 9 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ rules:
- ""
resources:
- pods
- pods/log
verbs:
- get
- list
Expand All @@ -70,3 +71,11 @@ rules:
- get
- patch
- update
- apiGroups:
- ""
resources:
- secrets
verbs:
- list
- get
- watch
10 changes: 10 additions & 0 deletions controllers/k6_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,22 @@ func (r *K6Reconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
return ctrl.Result{Requeue: true}, err
}

log.Info(fmt.Sprintf("Reconcile(); stage = %s", k6.Status.Stage))

switch k6.Status.Stage {
case "":
return InitializeJobs(ctx, log, k6, r)
case "initialization":
// here we're just waiting until initialize is done
// Note: it is present as a separate stage to ensure there's only one
// initialization job at a time
return ctrl.Result{}, nil
case "initialized":
return CreateJobs(ctx, log, k6, r)
case "created":
return StartJobs(ctx, log, k6, r)
case "started":
// wait for test to finish and then mark as finished
return FinishJobs(ctx, log, k6, r)
case "finished":
// delete if configured
Expand Down
8 changes: 6 additions & 2 deletions controllers/k6_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
)

// CreateJobs that will spawn k6 pods, running distributed tests
// CreateJobs creates jobs that will spawn k6 pods for distributed test
func CreateJobs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6Reconciler) (ctrl.Result, error) {
var err error
var res ctrl.Result
Expand All @@ -25,6 +25,7 @@ func CreateJobs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6Reco
return res, err
}

log.Info("Changing stage of K6 status to created")
k6.Status.Stage = "created"
if err = r.Client.Status().Update(ctx, k6); err != nil {
log.Error(err, "Could not update status of custom resource")
Expand Down Expand Up @@ -62,11 +63,14 @@ func launchTest(ctx context.Context, k6 *v1alpha1.K6, index int, log logr.Logger
msg := fmt.Sprintf("Launching k6 test #%d", index)
log.Info(msg)

if job, err = jobs.NewRunnerJob(k6, index); err != nil {
if job, err = jobs.NewRunnerJob(k6, index, testRunId, token); err != nil {
log.Error(err, "Failed to generate k6 test job")
return err
}

log.Info(fmt.Sprintf("Runner job is ready to start with image `%s` and command `%s`",
job.Spec.Template.Spec.Containers[0].Image, job.Spec.Template.Spec.Containers[0].Command))

if err = ctrl.SetControllerReference(k6, job, r.Scheme); err != nil {
log.Error(err, "Failed to set controller reference for job")
return err
Expand Down
86 changes: 62 additions & 24 deletions controllers/k6_finish.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,47 +3,85 @@ package controllers
import (
"context"
"fmt"
"time"

"github.com/go-logr/logr"
"github.com/grafana/k6-operator/api/v1alpha1"
"github.com/grafana/k6-operator/pkg/cloud"
"github.com/grafana/k6-operator/pkg/types"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// Mark k6 as finished as jobs finish
// FinishJobs waits for the pods to finish, performs finishing call for cloud output and moves state to "finished".
func FinishJobs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6Reconciler) (ctrl.Result, error) {
selector := labels.SelectorFromSet(map[string]string{
"app": "k6",
"k6_cr": k6.Name,
})
log.Info("Waiting for pods to finish")

opts := &client.ListOptions{LabelSelector: selector, Namespace: k6.Namespace}
jl := &batchv1.JobList{}
// Here we assume that the test runs for some time and there is no need to
// check it more often than twice in a minute.
//
// The total timeout for the test is set to duration of the test + 2 min.
// These 2 min are meant to cover the time needed to start the pods: sometimes
// pods are ready a bit later than operator reaches this stage so from the
// viewpoint of operator it takes longer. This behaviour depends on the setup of
// cluster. 2 min are meant to be a sufficient safeguard for such cases.

if err := r.List(ctx, jl, opts); err != nil {
log.Error(err, "Could not list jobs")
return ctrl.Result{}, err
}
testDuration := inspectOutput.TotalDuration.TimeDuration()

err := wait.PollImmediate(time.Second*30, testDuration+time.Minute*2, func() (done bool, err error) {
selector := labels.SelectorFromSet(map[string]string{
"app": "k6",
"k6_cr": k6.Name,
"runner": "true",
})

opts := &client.ListOptions{LabelSelector: selector, Namespace: k6.Namespace}
jl := &batchv1.JobList{}

//TODO: We should distinguish between suceeded/failed
var finished int32
for _, job := range jl.Items {
if job.Status.Active != 0 {
continue
if err := r.List(ctx, jl, opts); err != nil {
log.Error(err, "Could not list jobs")
return false, nil
}

// TODO: We should distinguish between Suceeded/Failed/Unknown
var finished int32
for _, job := range jl.Items {
if job.Status.Active != 0 {
continue
}
finished++
}
finished++
}

log.Info(fmt.Sprintf("%d/%d jobs complete", finished, k6.Spec.Parallelism+1))
log.Info(fmt.Sprintf("%d/%d jobs complete", finished, k6.Spec.Parallelism))

// parallelism (pods) + starter pod = total expected
if finished == k6.Spec.Parallelism+1 {
k6.Status.Stage = "finished"
if err := r.Client.Status().Update(ctx, k6); err != nil {
log.Error(err, "Could not update status of custom resource")
if finished >= k6.Spec.Parallelism {
return true, nil
}

return false, nil
})

if err != nil {
log.Error(err, "Waiting for pods to finish ended with error")
}

// If this is a test run with cloud output, try to finalize it regardless.
if cli := types.ParseCLI(&k6.Spec); cli.HasCloudOut {
if err = cloud.FinishTestRun(testRunId); err != nil {
log.Error(err, "Could not finalize the test run with cloud output")
} else {
log.Info(fmt.Sprintf("Cloud test run %s was finalized succesfully", testRunId))
}
}

log.Info("Changing stage of K6 status to finished")
k6.Status.Stage = "finished"
if err = r.Client.Status().Update(ctx, k6); err != nil {
log.Error(err, "Could not update status of custom resource")
return ctrl.Result{}, err
}

return ctrl.Result{}, nil
Expand Down
Loading