Skip to content

Commit

Permalink
Create metric for flux manifest errors - fluxcd#2199
Browse files Browse the repository at this point in the history
  • Loading branch information
mpashka committed Oct 21, 2019
1 parent 7ed5de4 commit edf514f
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 2 deletions.
9 changes: 9 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,21 @@ require (
github.com/fluxcd/helm-operator v1.0.0-rc2
github.com/ghodss/yaml v1.0.0
github.com/go-kit/kit v0.9.0
github.com/go-stack/stack v1.8.0 // indirect
github.com/golang/gddo v0.0.0-20190312205958-5a2505f3dbf0
github.com/googleapis/gnostic v0.3.0 // indirect
github.com/gorilla/mux v1.7.1
github.com/gorilla/websocket v1.4.0
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/imdario/mergo v0.3.7
github.com/instrumenta/kubeval v0.0.0-20190804145309-805845b47dfc
github.com/justinbarrick/go-k8s-portforward v1.0.4-0.20190722134107-d79fe1b9d79d
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/opencontainers/go-digest v1.0.0-rc1
github.com/pkg/errors v0.8.1
github.com/pkg/term v0.0.0-20190109203006-aa71e9d9e942
github.com/prometheus/client_golang v1.1.0
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90
github.com/ryanuber/go-glob v1.0.0
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd
Expand All @@ -31,6 +36,9 @@ require (
github.com/weaveworks/common v0.0.0-20190410110702-87611edc252e
github.com/weaveworks/go-checkpoint v0.0.0-20170503165305-ebbb8b0518ab
github.com/whilp/git-urls v0.0.0-20160530060445-31bac0d230fa
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 // indirect
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 // indirect
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 // indirect
golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
gopkg.in/yaml.v2 v2.2.2
Expand All @@ -40,6 +48,7 @@ require (
k8s.io/client-go v11.0.0+incompatible
k8s.io/helm v2.13.1+incompatible
k8s.io/klog v0.3.3
k8s.io/utils v0.0.0-20190712204705-3dccf664f023 // indirect
)

replace github.com/docker/distribution => github.com/2opremio/distribution v0.0.0-20190419185413-6c9727e5e5de
Expand Down
7 changes: 7 additions & 0 deletions pkg/daemon/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,11 @@ var (
Name: "queue_length_count",
Help: "Count of jobs waiting in the queue to be run.",
}, []string{})

errorsCountMetric = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: "flux",
Subsystem: "daemon",
Name: "errors_count",
Help: "Number of errors.",
}, []string{fluxmetrics.LabelType})
)
7 changes: 7 additions & 0 deletions pkg/daemon/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/sha256"
"encoding/base64"
"github.com/fluxcd/flux/pkg/metrics"
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"path/filepath"
Expand Down Expand Up @@ -149,14 +150,17 @@ func doSync(ctx context.Context, manifestsStore manifests.Store, clus cluster.Cl
logger log.Logger) (map[string]resource.Resource, []event.ResourceError, error) {
resources, err := manifestsStore.GetAllResourcesByID(ctx)
if err != nil {
errorsCountMetric.With(metrics.LabelType, "manifest").Set(1)
return nil, nil, errors.Wrap(err, "loading resources from repo")
}
errorsCountMetric.With(metrics.LabelType, "manifest").Set(0)

var resourceErrors []event.ResourceError
if err := fluxsync.Sync(syncSetName, resources, clus); err != nil {
switch syncerr := err.(type) {
case cluster.SyncError:
logger.Log("err", err)
errorsCountMetric.With(metrics.LabelType, "sync").Set(float64(len(syncerr)))
for _, e := range syncerr {
resourceErrors = append(resourceErrors, event.ResourceError{
ID: e.ResourceID,
Expand All @@ -165,8 +169,11 @@ func doSync(ctx context.Context, manifestsStore manifests.Store, clus cluster.Cl
})
}
default:
errorsCountMetric.With(metrics.LabelType, "sync").Set(1)
return nil, nil, err
}
} else {
errorsCountMetric.With(metrics.LabelType, "sync").Set(0)
}
return resources, resourceErrors, nil
}
Expand Down
166 changes: 164 additions & 2 deletions pkg/daemon/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"testing"
"time"

"github.com/go-kit/kit/log"

"github.com/fluxcd/flux/pkg/cluster"
"github.com/fluxcd/flux/pkg/cluster/kubernetes"
"github.com/fluxcd/flux/pkg/cluster/kubernetes/testfiles"
Expand All @@ -26,6 +24,9 @@ import (
registryMock "github.com/fluxcd/flux/pkg/registry/mock"
"github.com/fluxcd/flux/pkg/resource"
fluxsync "github.com/fluxcd/flux/pkg/sync"
"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
io_prometheus_client "github.com/prometheus/client_model/go"
)

const (
Expand Down Expand Up @@ -86,6 +87,50 @@ func daemon(t *testing.T) (*Daemon, func()) {
}
}

func findMetric(name string, metricType io_prometheus_client.MetricType, labels ...string) (*io_prometheus_client.Metric, error) {
metricsRegistry := prometheus.DefaultRegisterer.(*prometheus.Registry)
if metrics, err := metricsRegistry.Gather(); err == nil {
for _, metricFamily := range metrics {
if *metricFamily.Name == name {
if *metricFamily.Type != metricType {
return nil, fmt.Errorf("Metric types for %v doesn't correpond: %v != %v", name, metricFamily.Type, metricType)
}
for _, metric := range metricFamily.Metric {
if len(labels) != len(metric.Label)*2 {
return nil, fmt.Errorf("Metric labels length for %v doesn't correpond: %v != %v", name, len(labels)*2, len(metric.Label))
}
for labelIdx, label := range metric.Label {
if labels[labelIdx*2] != *label.Name {
return nil, fmt.Errorf("Metric label for %v doesn't correpond: %v != %v", name, labels[labelIdx*2], *label.Name)
} else if labels[labelIdx*2+1] != *label.Value {
break
} else if labelIdx == len(metric.Label)-1 {
return metric, nil
}
}
}
return nil, fmt.Errorf("Can't find metric %v with appropriate labels in registry", name)
}
}
return nil, fmt.Errorf("Can't find metric %v in registry", name)
} else {
return nil, fmt.Errorf("Error reading metrics registry %v", err)
}
}

func checkErrorsMetrics(t *testing.T, manifestErrors, syncErrors int) {
if metric, err := findMetric("flux_daemon_errors_count", io_prometheus_client.MetricType_GAUGE, "type", "manifest"); err != nil {
t.Errorf("Error collecting metrics %v", err)
} else if int(*metric.Gauge.Value) != manifestErrors {
t.Errorf("Manifest errors must be %v. Got %v", manifestErrors, *metric.Gauge.Value)
}
if metric, err := findMetric("flux_daemon_errors_count", io_prometheus_client.MetricType_GAUGE, "type", "sync"); err != nil {
t.Errorf("Error collecting metrics %v", err)
} else if int(*metric.Gauge.Value) != syncErrors {
t.Errorf("Sync errors must be %v. Got %v", syncErrors, *metric.Gauge.Value)
}
}

func TestPullAndSync_InitialSync(t *testing.T) {
d, cleanup := daemon(t)
defer cleanup()
Expand Down Expand Up @@ -148,6 +193,18 @@ func TestPullAndSync_InitialSync(t *testing.T) {
} else if len(revs) <= 0 {
t.Errorf("Found no revisions before the sync tag")
}

// Check 0 error stats
if metric, err := findMetric("flux_daemon_errors_count", io_prometheus_client.MetricType_GAUGE, "type", "manifest"); err != nil {
t.Errorf("Error collecting metrics %v", err)
} else if int(*metric.Gauge.Value) != 0 {
t.Errorf("Manifest errors must be 0. Got %v", *metric.Gauge.Value)
}
if metric, err := findMetric("flux_daemon_errors_count", io_prometheus_client.MetricType_GAUGE, "type", "sync"); err != nil {
t.Errorf("Error collecting metrics %v", err)
} else if int(*metric.Gauge.Value) != 0 {
t.Errorf("Sync errors must be 0. Got %v", *metric.Gauge.Value)
}
}

func TestDoSync_NoNewCommits(t *testing.T) {
Expand Down Expand Up @@ -357,3 +414,108 @@ func TestDoSync_WithNewCommit(t *testing.T) {
t.Errorf("Should have moved sync tag to HEAD (%s), but was moved to: %s", newRevision, revs[len(revs)-1].Revision)
}
}

func TestDoSync_WithErrors(t *testing.T) {
d, cleanup := daemon(t)
defer cleanup()

k8s.SyncFunc = func(def cluster.SyncSet) error {
return nil
}

ctx := context.Background()
head, err := d.Repo.BranchHead(ctx)
if err != nil {
t.Fatal(err)
}

syncTag := "sync"
gitSync, _ := fluxsync.NewGitTagSyncProvider(d.Repo, syncTag, "", false, d.GitConfig)
syncState := &lastKnownSyncState{logger: d.Logger, state: gitSync}

if err := d.Sync(ctx, time.Now().UTC(), head, syncState); err != nil {
t.Error(err)
}

// Check 0 error stats
checkErrorsMetrics(t, 0, 0)

// Now add wrong manifest
err = d.WithWorkingClone(ctx, func(checkout *git.Checkout) error {
ctx, cancel := context.WithTimeout(ctx, 5000*time.Second)
defer cancel()

absolutePath := path.Join(checkout.Dir(), "error_manifest.yaml")
if err := ioutil.WriteFile(absolutePath, []byte("Manifest that must produce errors"), 0600); err != nil {
return err
}
commitAction := git.CommitAction{Author: "", Message: "test error commit"}
err = checkout.CommitAndPush(ctx, commitAction, nil, true)
if err != nil {
return err
}
return err
})
if err != nil {
t.Fatal(err)
}

err = d.Repo.Refresh(ctx)
if err != nil {
t.Error(err)
}

if err := d.Sync(ctx, time.Now().UTC(), "HEAD", syncState); err != nil {
// Check 1 error stats
checkErrorsMetrics(t, 1, 0)
} else {
t.Error("Sync must fail because of invalid manifest")
}

// Fix manifest
err = d.WithWorkingClone(ctx, func(checkout *git.Checkout) error {
ctx, cancel := context.WithTimeout(ctx, 5000*time.Second)
defer cancel()

absolutePath := path.Join(checkout.Dir(), "error_manifest.yaml")
if err := ioutil.WriteFile(absolutePath, []byte("# Just comment"), 0600); err != nil {
return err
}
commitAction := git.CommitAction{Author: "", Message: "test fix commit"}
err = checkout.CommitAndPush(ctx, commitAction, nil, true)
if err != nil {
return err
}
return err
})

if err != nil {
t.Fatal(err)
}

err = d.Repo.Refresh(ctx)
if err != nil {
t.Error(err)
}

if err := d.Sync(ctx, time.Now().UTC(), "HEAD", syncState); err != nil {
t.Error(err)
}
// Check 0 manifest error stats
checkErrorsMetrics(t, 0, 0)

// Emulate sync errors
k8s.SyncFunc = func(def cluster.SyncSet) error {
return cluster.SyncError{
cluster.ResourceError{resource.MustParseID("mynamespace:deployment/depl1"), "src1", fmt.Errorf("Error1")},
cluster.ResourceError{resource.MustParseID("mynamespace:deployment/depl2"), "src2", fmt.Errorf("Error2")},
}
}

if err := d.Sync(ctx, time.Now().UTC(), "HEAD", syncState); err != nil {
t.Error(err)
}

// Check 2 sync error in stats
checkErrorsMetrics(t, 0, 2)
}
1 change: 1 addition & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const (
LabelRoute = "route"
LabelMethod = "method"
LabelSuccess = "success"
LabelType = "type"

// Labels for release metrics
LabelAction = "action"
Expand Down

0 comments on commit edf514f

Please sign in to comment.