Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/cloudwatch #1032

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,7 @@
[[prune.project]]
name = "github.com/spf13/cobra"
unused-packages = false

[[constraint]]
name = "github.com/aws/aws-sdk-go"
version = "1.19.38"
17 changes: 11 additions & 6 deletions cmd/collectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"fmt"
"strings"

"github.com/loadimpact/k6/stats/awscloudwatch"

"gopkg.in/guregu/null.v3"

"github.com/kelseyhightower/envconfig"
Expand All @@ -40,12 +42,13 @@ import (
)

const (
collectorInfluxDB = "influxdb"
collectorJSON = "json"
collectorKafka = "kafka"
collectorCloud = "cloud"
collectorStatsD = "statsd"
collectorDatadog = "datadog"
collectorInfluxDB = "influxdb"
collectorJSON = "json"
collectorKafka = "kafka"
collectorCloud = "cloud"
collectorStatsD = "statsd"
collectorDatadog = "datadog"
collectorCloudWatch = "cloudwatch"
)

func parseCollector(s string) (t, arg string) {
Expand Down Expand Up @@ -110,6 +113,8 @@ func newCollector(collectorName, arg string, src *lib.SourceData, conf Config) (
return nil, err
}
return datadog.New(config)
case collectorCloudWatch:
return awscloudwatch.New(awscloudwatch.ClientFactory(arg))
default:
return nil, errors.Errorf("unknown output type: %s", collectorName)
}
Expand Down
10 changes: 9 additions & 1 deletion release notes/upcoming.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@ TODO: Intro

Now all http methods have an additional param called `compression` that will make k6 compress the body before sending it. It will also correctly set both `Content-Encoding` and `Content-Length`, unless they were manually set in the request `headers` by the user. The current supported algorithms are `deflate` and `gzip` and any combination of the two separated by a comma (`,`).

### New result outputs: CloudWatch (#1032)

You can now output any metrics k6 collects to CloudWatch by running `k6 run --out cloudwatch=metrics_namespace`.
Every tag associated with a metric will be sent as a [dimension](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch_concepts.html#Dimension).
CloudWatch **only accepts up to 10 dimensions**, the rest of the tags will be ignored. You may want to use **system-tags** option
to reduce produced tags.


## Bugs fixed!

* JS: Many fixes for `open()`: (#965)
Expand All @@ -19,4 +27,4 @@ Now all http methods have an additional param called `compression` that will mak
## Internals

* JS: VU initialization time and memory usage has been significantly decreased by caching the parsed version of the core-js library. Thanks, @matlockx! (#1038)
* CI: removed the gometalinter check in CircleCI, since that project [was deprecated](https://github.com/alecthomas/gometalinter/issues/590) and now exclusively rely on [golangci-lint](https://github.com/golangci/golangci-lint). (#1039)
* CI: removed the gometalinter check in CircleCI, since that project [was deprecated](https://github.com/alecthomas/gometalinter/issues/590) and now exclusively rely on [golangci-lint](https://github.com/golangci/golangci-lint). (#1039)
115 changes: 115 additions & 0 deletions stats/awscloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package awscloudwatch

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

type client struct {
cloudwatchiface.CloudWatchAPI
namespace string
endpoint string
}

// ClientFactory returns a function that creates the AWS CloudWatch client
func ClientFactory(namespace string) func() (cloudWatchClient, error) {
return func() (cloudWatchClient, error) {
cw, err := newCloudWatchClient()
if err != nil {
return nil, err
}

return &client{
CloudWatchAPI: cw,
namespace: namespace,
endpoint: cw.Endpoint,
}, nil
}
}

// newCloudWatchClient creates a new CloudWatch client
func newCloudWatchClient() (*cloudwatch.CloudWatch, error) {
sess, err := session.NewSession()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading the docs for creating an AWS session (https://godoc.org/github.com/aws/aws-sdk-go/aws/session#hdr-Creating_Sessions), this would load the AWS credentials from ~/.aws/credentials or use AWS_* environment variables? I've only skimmed the SDK docs, but that seems a bit messy to me... 😕

Ideally, this output shouldn't rely on the presence of files on the filesystem or read environment variables directly from the OS. For one, that makes it harder to test, but my main concern is that this could get in the way of the future native distributed execution support we want to introduce in k6... Also, the environment variables here aren't prefixed with K6_, like the other config options are. But from skimming the AWS SDK docs, I'm not sure we can forbid the SDK code from actually doing all of that magic 😕

If the k6 output configuration situation was otherwise perfect (and it's unfortunately not - #587, #883 😞 ), I would force the issue, but I'm not sure if we shouldn't accept this as-is and deprecate the direct options in the future? @mstoykov, thoughts?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only way that I see to avoid this issue is duplicate the private newSession. I know this is not an ideal solution, but at the same time I don't see a big problem neither. Looking at what NewSessionWithOptions does, for instance, seems that most of the functionality is around more flexible configuration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not a big fan of loading things automatically without prefixes ... Not certain if copying the newSession will be enough, but if it is ... maybe that is good idea.

On a related note (of copying code) ... I am under the impression that implementing the whole PutMetric call in k6 instead of using the aws-sdk will not be ... all that much code given that it is just http request.

if err != nil {
return nil, err
}

return cloudwatch.New(sess), nil
}

const maxMetricsPerCall = 20

// reportSamples reports samples to CloudWatch.
// It sends samples in batches of max 20, which is the upper limit of metrics
// accepted per request by CloudWatch
func (c *client) reportSamples(samples []*sample) error {
samplesSent := 0
var lastError error

for samplesSent < len(samples) {
input := &cloudwatch.PutMetricDataInput{Namespace: &c.namespace}
upperLimit := samplesSent + maxMetricsPerCall
if len(samples) < upperLimit {
upperLimit = len(samples)
}

for _, s := range samples[samplesSent:upperLimit] {
input.MetricData = append(input.MetricData, toMetricDatum(s))
samplesSent++
}

_, err := c.PutMetricData(input)
if err != nil {
logrus.WithError(err).Warn("Error sending metrics to CloudWatch")
lastError = err
}
}

if lastError != nil {
return errors.Wrap(lastError, "Error sending metrics")
}

return nil
}

func (c *client) address() string {
return c.endpoint
}

const maxNumberOfDimensions = 10

func toMetricDatum(s *sample) *cloudwatch.MetricDatum {
datum := &cloudwatch.MetricDatum{
Value: &s.Value,
MetricName: &s.Metric,
Timestamp: &s.Time,
}

var dims []*cloudwatch.Dimension

for name, value := range s.Tags {
if len(dims) == maxNumberOfDimensions {
logrus.WithField("tags", s.Tags).
WithField("dimensions_included", dims).
Warnf("More than 10 tags, just 10 will be reported to CloudWatch")
break
}

if value != "" {
dims = append(dims, &cloudwatch.Dimension{
Name: aws.String(name),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very strange... 😕 This just returns a pointer to the value and I have no idea why AWS needs it...

Value: aws.String(value),
})
}
}

if len(dims) > 0 {
datum.Dimensions = dims
}

return datum
}
Loading