From edf514f1ecde404d2c1f611275d10a271e9396f5 Mon Sep 17 00:00:00 2001 From: Pavel Moukhataev Date: Mon, 21 Oct 2019 15:29:51 +0300 Subject: [PATCH] Create metric for flux manifest errors - https://github.com/fluxcd/flux/issues/2199 --- go.mod | 9 +++ pkg/daemon/metrics.go | 7 ++ pkg/daemon/sync.go | 7 ++ pkg/daemon/sync_test.go | 166 +++++++++++++++++++++++++++++++++++++++- pkg/metrics/metrics.go | 1 + 5 files changed, 188 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 15e27bd775..60d148e81d 100644 --- a/go.mod +++ b/go.mod @@ -12,16 +12,21 @@ require ( github.com/fluxcd/helm-operator v1.0.0-rc2 github.com/ghodss/yaml v1.0.0 github.com/go-kit/kit v0.9.0 + github.com/go-stack/stack v1.8.0 // indirect github.com/golang/gddo v0.0.0-20190312205958-5a2505f3dbf0 + github.com/googleapis/gnostic v0.3.0 // indirect github.com/gorilla/mux v1.7.1 github.com/gorilla/websocket v1.4.0 + github.com/hashicorp/golang-lru v0.5.3 // indirect github.com/imdario/mergo v0.3.7 github.com/instrumenta/kubeval v0.0.0-20190804145309-805845b47dfc github.com/justinbarrick/go-k8s-portforward v1.0.4-0.20190722134107-d79fe1b9d79d + github.com/modern-go/reflect2 v1.0.1 // indirect github.com/opencontainers/go-digest v1.0.0-rc1 github.com/pkg/errors v0.8.1 github.com/pkg/term v0.0.0-20190109203006-aa71e9d9e942 github.com/prometheus/client_golang v1.1.0 + github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 github.com/ryanuber/go-glob v1.0.0 github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd @@ -31,6 +36,9 @@ require ( github.com/weaveworks/common v0.0.0-20190410110702-87611edc252e github.com/weaveworks/go-checkpoint v0.0.0-20170503165305-ebbb8b0518ab github.com/whilp/git-urls v0.0.0-20160530060445-31bac0d230fa + golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 // indirect + golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 // indirect + golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 // indirect golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3 golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 gopkg.in/yaml.v2 v2.2.2 @@ -40,6 +48,7 @@ require ( k8s.io/client-go v11.0.0+incompatible k8s.io/helm v2.13.1+incompatible k8s.io/klog v0.3.3 + k8s.io/utils v0.0.0-20190712204705-3dccf664f023 // indirect ) replace github.com/docker/distribution => github.com/2opremio/distribution v0.0.0-20190419185413-6c9727e5e5de diff --git a/pkg/daemon/metrics.go b/pkg/daemon/metrics.go index 7cad9a1667..c79cb6fe3d 100644 --- a/pkg/daemon/metrics.go +++ b/pkg/daemon/metrics.go @@ -44,4 +44,11 @@ var ( Name: "queue_length_count", Help: "Count of jobs waiting in the queue to be run.", }, []string{}) + + errorsCountMetric = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: "flux", + Subsystem: "daemon", + Name: "errors_count", + Help: "Number of errors.", + }, []string{fluxmetrics.LabelType}) ) diff --git a/pkg/daemon/sync.go b/pkg/daemon/sync.go index e0c597803d..043cd388c3 100644 --- a/pkg/daemon/sync.go +++ b/pkg/daemon/sync.go @@ -4,6 +4,7 @@ import ( "context" "crypto/sha256" "encoding/base64" + "github.com/fluxcd/flux/pkg/metrics" "github.com/go-kit/kit/log" "github.com/pkg/errors" "path/filepath" @@ -149,14 +150,17 @@ func doSync(ctx context.Context, manifestsStore manifests.Store, clus cluster.Cl logger log.Logger) (map[string]resource.Resource, []event.ResourceError, error) { resources, err := manifestsStore.GetAllResourcesByID(ctx) if err != nil { + errorsCountMetric.With(metrics.LabelType, "manifest").Set(1) return nil, nil, errors.Wrap(err, "loading resources from repo") } + errorsCountMetric.With(metrics.LabelType, "manifest").Set(0) var resourceErrors []event.ResourceError if err := fluxsync.Sync(syncSetName, resources, clus); err != nil { switch syncerr := err.(type) { case cluster.SyncError: logger.Log("err", err) + errorsCountMetric.With(metrics.LabelType, "sync").Set(float64(len(syncerr))) for _, e := range syncerr { resourceErrors = append(resourceErrors, event.ResourceError{ ID: e.ResourceID, @@ -165,8 +169,11 @@ func doSync(ctx context.Context, manifestsStore manifests.Store, clus cluster.Cl }) } default: + errorsCountMetric.With(metrics.LabelType, "sync").Set(1) return nil, nil, err } + } else { + errorsCountMetric.With(metrics.LabelType, "sync").Set(0) } return resources, resourceErrors, nil } diff --git a/pkg/daemon/sync_test.go b/pkg/daemon/sync_test.go index d159c35feb..5855a1cba5 100644 --- a/pkg/daemon/sync_test.go +++ b/pkg/daemon/sync_test.go @@ -12,8 +12,6 @@ import ( "testing" "time" - "github.com/go-kit/kit/log" - "github.com/fluxcd/flux/pkg/cluster" "github.com/fluxcd/flux/pkg/cluster/kubernetes" "github.com/fluxcd/flux/pkg/cluster/kubernetes/testfiles" @@ -26,6 +24,9 @@ import ( registryMock "github.com/fluxcd/flux/pkg/registry/mock" "github.com/fluxcd/flux/pkg/resource" fluxsync "github.com/fluxcd/flux/pkg/sync" + "github.com/go-kit/kit/log" + "github.com/prometheus/client_golang/prometheus" + io_prometheus_client "github.com/prometheus/client_model/go" ) const ( @@ -86,6 +87,50 @@ func daemon(t *testing.T) (*Daemon, func()) { } } +func findMetric(name string, metricType io_prometheus_client.MetricType, labels ...string) (*io_prometheus_client.Metric, error) { + metricsRegistry := prometheus.DefaultRegisterer.(*prometheus.Registry) + if metrics, err := metricsRegistry.Gather(); err == nil { + for _, metricFamily := range metrics { + if *metricFamily.Name == name { + if *metricFamily.Type != metricType { + return nil, fmt.Errorf("Metric types for %v doesn't correpond: %v != %v", name, metricFamily.Type, metricType) + } + for _, metric := range metricFamily.Metric { + if len(labels) != len(metric.Label)*2 { + return nil, fmt.Errorf("Metric labels length for %v doesn't correpond: %v != %v", name, len(labels)*2, len(metric.Label)) + } + for labelIdx, label := range metric.Label { + if labels[labelIdx*2] != *label.Name { + return nil, fmt.Errorf("Metric label for %v doesn't correpond: %v != %v", name, labels[labelIdx*2], *label.Name) + } else if labels[labelIdx*2+1] != *label.Value { + break + } else if labelIdx == len(metric.Label)-1 { + return metric, nil + } + } + } + return nil, fmt.Errorf("Can't find metric %v with appropriate labels in registry", name) + } + } + return nil, fmt.Errorf("Can't find metric %v in registry", name) + } else { + return nil, fmt.Errorf("Error reading metrics registry %v", err) + } +} + +func checkErrorsMetrics(t *testing.T, manifestErrors, syncErrors int) { + if metric, err := findMetric("flux_daemon_errors_count", io_prometheus_client.MetricType_GAUGE, "type", "manifest"); err != nil { + t.Errorf("Error collecting metrics %v", err) + } else if int(*metric.Gauge.Value) != manifestErrors { + t.Errorf("Manifest errors must be %v. Got %v", manifestErrors, *metric.Gauge.Value) + } + if metric, err := findMetric("flux_daemon_errors_count", io_prometheus_client.MetricType_GAUGE, "type", "sync"); err != nil { + t.Errorf("Error collecting metrics %v", err) + } else if int(*metric.Gauge.Value) != syncErrors { + t.Errorf("Sync errors must be %v. Got %v", syncErrors, *metric.Gauge.Value) + } +} + func TestPullAndSync_InitialSync(t *testing.T) { d, cleanup := daemon(t) defer cleanup() @@ -148,6 +193,18 @@ func TestPullAndSync_InitialSync(t *testing.T) { } else if len(revs) <= 0 { t.Errorf("Found no revisions before the sync tag") } + + // Check 0 error stats + if metric, err := findMetric("flux_daemon_errors_count", io_prometheus_client.MetricType_GAUGE, "type", "manifest"); err != nil { + t.Errorf("Error collecting metrics %v", err) + } else if int(*metric.Gauge.Value) != 0 { + t.Errorf("Manifest errors must be 0. Got %v", *metric.Gauge.Value) + } + if metric, err := findMetric("flux_daemon_errors_count", io_prometheus_client.MetricType_GAUGE, "type", "sync"); err != nil { + t.Errorf("Error collecting metrics %v", err) + } else if int(*metric.Gauge.Value) != 0 { + t.Errorf("Sync errors must be 0. Got %v", *metric.Gauge.Value) + } } func TestDoSync_NoNewCommits(t *testing.T) { @@ -357,3 +414,108 @@ func TestDoSync_WithNewCommit(t *testing.T) { t.Errorf("Should have moved sync tag to HEAD (%s), but was moved to: %s", newRevision, revs[len(revs)-1].Revision) } } + +func TestDoSync_WithErrors(t *testing.T) { + d, cleanup := daemon(t) + defer cleanup() + + k8s.SyncFunc = func(def cluster.SyncSet) error { + return nil + } + + ctx := context.Background() + head, err := d.Repo.BranchHead(ctx) + if err != nil { + t.Fatal(err) + } + + syncTag := "sync" + gitSync, _ := fluxsync.NewGitTagSyncProvider(d.Repo, syncTag, "", false, d.GitConfig) + syncState := &lastKnownSyncState{logger: d.Logger, state: gitSync} + + if err := d.Sync(ctx, time.Now().UTC(), head, syncState); err != nil { + t.Error(err) + } + + // Check 0 error stats + checkErrorsMetrics(t, 0, 0) + + // Now add wrong manifest + err = d.WithWorkingClone(ctx, func(checkout *git.Checkout) error { + ctx, cancel := context.WithTimeout(ctx, 5000*time.Second) + defer cancel() + + absolutePath := path.Join(checkout.Dir(), "error_manifest.yaml") + if err := ioutil.WriteFile(absolutePath, []byte("Manifest that must produce errors"), 0600); err != nil { + return err + } + commitAction := git.CommitAction{Author: "", Message: "test error commit"} + err = checkout.CommitAndPush(ctx, commitAction, nil, true) + if err != nil { + return err + } + return err + }) + if err != nil { + t.Fatal(err) + } + + err = d.Repo.Refresh(ctx) + if err != nil { + t.Error(err) + } + + if err := d.Sync(ctx, time.Now().UTC(), "HEAD", syncState); err != nil { + // Check 1 error stats + checkErrorsMetrics(t, 1, 0) + } else { + t.Error("Sync must fail because of invalid manifest") + } + + // Fix manifest + err = d.WithWorkingClone(ctx, func(checkout *git.Checkout) error { + ctx, cancel := context.WithTimeout(ctx, 5000*time.Second) + defer cancel() + + absolutePath := path.Join(checkout.Dir(), "error_manifest.yaml") + if err := ioutil.WriteFile(absolutePath, []byte("# Just comment"), 0600); err != nil { + return err + } + commitAction := git.CommitAction{Author: "", Message: "test fix commit"} + err = checkout.CommitAndPush(ctx, commitAction, nil, true) + if err != nil { + return err + } + return err + }) + + if err != nil { + t.Fatal(err) + } + + err = d.Repo.Refresh(ctx) + if err != nil { + t.Error(err) + } + + if err := d.Sync(ctx, time.Now().UTC(), "HEAD", syncState); err != nil { + t.Error(err) + } + // Check 0 manifest error stats + checkErrorsMetrics(t, 0, 0) + + // Emulate sync errors + k8s.SyncFunc = func(def cluster.SyncSet) error { + return cluster.SyncError{ + cluster.ResourceError{resource.MustParseID("mynamespace:deployment/depl1"), "src1", fmt.Errorf("Error1")}, + cluster.ResourceError{resource.MustParseID("mynamespace:deployment/depl2"), "src2", fmt.Errorf("Error2")}, + } + } + + if err := d.Sync(ctx, time.Now().UTC(), "HEAD", syncState); err != nil { + t.Error(err) + } + + // Check 2 sync error in stats + checkErrorsMetrics(t, 0, 2) +} diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 554c6ba304..94442eda74 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -8,6 +8,7 @@ const ( LabelRoute = "route" LabelMethod = "method" LabelSuccess = "success" + LabelType = "type" // Labels for release metrics LabelAction = "action"