diff --git a/docs/references/monitoring.md b/docs/references/monitoring.md index d061686d8..ed2267e57 100644 --- a/docs/references/monitoring.md +++ b/docs/references/monitoring.md @@ -14,5 +14,15 @@ The following metrics are exposed: | `flux_daemon_queue_duration_seconds` | Duration of time spent in the job queue before execution | `flux_daemon_queue_length_count` | Count of jobs waiting in the queue to be run | `flux_daemon_sync_duration_seconds` | Duration of git-to-cluster synchronisation +| `flux_daemon_sync_manifests` | Number of manifests being synced to cluster | `flux_registry_fetch_duration_seconds` | Duration of image metadata requests (from cache) | `flux_fluxd_connection_duration_seconds` | Duration in seconds of the current connection to fluxsvc + +Flux sync state can be obtained by using the following PromQL expressions: +* `delta(flux_daemon_sync_duration_seconds_count{success='true'}[6m]) < 1` - for general flux sync errors - usually if +that is true then there are some problems with infrastructure or there are manifests parse error or there are manifests +with duplicate ids. + +* `flux_daemon_sync_manifests{success='false'} > 0` - for git manifests errors - if true then there are either some +problems with applying git manifests to kubernetes - e.g. configmap size is too big to fit in annotations or +immutable field (like label selector) was changed. diff --git a/pkg/daemon/metrics.go b/pkg/daemon/metrics.go index 7cad9a166..6c873e584 100644 --- a/pkg/daemon/metrics.go +++ b/pkg/daemon/metrics.go @@ -44,4 +44,11 @@ var ( Name: "queue_length_count", Help: "Count of jobs waiting in the queue to be run.", }, []string{}) + + syncManifestsMetric = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: "flux", + Subsystem: "daemon", + Name: "sync_manifests", + Help: "Number of synchronized manifests", + }, []string{fluxmetrics.LabelSuccess}) ) diff --git a/pkg/daemon/sync.go b/pkg/daemon/sync.go index e0c597803..6d764236b 100644 --- a/pkg/daemon/sync.go +++ b/pkg/daemon/sync.go @@ -4,6 +4,7 @@ import ( "context" "crypto/sha256" "encoding/base64" + "github.com/fluxcd/flux/pkg/metrics" "github.com/go-kit/kit/log" "github.com/pkg/errors" "path/filepath" @@ -157,6 +158,7 @@ func doSync(ctx context.Context, manifestsStore manifests.Store, clus cluster.Cl switch syncerr := err.(type) { case cluster.SyncError: logger.Log("err", err) + updateSyncManifestsMetric(len(resources)-len(syncerr), len(syncerr)) for _, e := range syncerr { resourceErrors = append(resourceErrors, event.ResourceError{ ID: e.ResourceID, @@ -167,10 +169,17 @@ func doSync(ctx context.Context, manifestsStore manifests.Store, clus cluster.Cl default: return nil, nil, err } + } else { + updateSyncManifestsMetric(len(resources), 0) } return resources, resourceErrors, nil } +func updateSyncManifestsMetric(success, failure int) { + syncManifestsMetric.With(metrics.LabelSuccess, "true").Set(float64(success)) + syncManifestsMetric.With(metrics.LabelSuccess, "false").Set(float64(failure)) +} + // getChangedResources calculates what resources are modified during // this sync. func (d *Daemon) getChangedResources(ctx context.Context, c changeSet, timeout time.Duration, working *git.Export, diff --git a/pkg/daemon/sync_test.go b/pkg/daemon/sync_test.go index d159c35fe..e5e9f2cc8 100644 --- a/pkg/daemon/sync_test.go +++ b/pkg/daemon/sync_test.go @@ -12,8 +12,6 @@ import ( "testing" "time" - "github.com/go-kit/kit/log" - "github.com/fluxcd/flux/pkg/cluster" "github.com/fluxcd/flux/pkg/cluster/kubernetes" "github.com/fluxcd/flux/pkg/cluster/kubernetes/testfiles" @@ -26,6 +24,9 @@ import ( registryMock "github.com/fluxcd/flux/pkg/registry/mock" "github.com/fluxcd/flux/pkg/resource" fluxsync "github.com/fluxcd/flux/pkg/sync" + "github.com/go-kit/kit/log" + "github.com/prometheus/client_golang/prometheus" + promdto "github.com/prometheus/client_model/go" ) const ( @@ -86,6 +87,50 @@ func daemon(t *testing.T) (*Daemon, func()) { } } +func findMetric(name string, metricType promdto.MetricType, labels ...string) (*promdto.Metric, error) { + metricsRegistry := prometheus.DefaultRegisterer.(*prometheus.Registry) + if metrics, err := metricsRegistry.Gather(); err == nil { + for _, metricFamily := range metrics { + if *metricFamily.Name == name { + if *metricFamily.Type != metricType { + return nil, fmt.Errorf("Metric types for %v doesn't correpond: %v != %v", name, metricFamily.Type, metricType) + } + for _, metric := range metricFamily.Metric { + if len(labels) != len(metric.Label)*2 { + return nil, fmt.Errorf("Metric labels length for %v doesn't correpond: %v != %v", name, len(labels)*2, len(metric.Label)) + } + for labelIdx, label := range metric.Label { + if labels[labelIdx*2] != *label.Name { + return nil, fmt.Errorf("Metric label for %v doesn't correpond: %v != %v", name, labels[labelIdx*2], *label.Name) + } else if labels[labelIdx*2+1] != *label.Value { + break + } else if labelIdx == len(metric.Label)-1 { + return metric, nil + } + } + } + return nil, fmt.Errorf("Can't find metric %v with appropriate labels in registry", name) + } + } + return nil, fmt.Errorf("Can't find metric %v in registry", name) + } else { + return nil, fmt.Errorf("Error reading metrics registry %v", err) + } +} + +func checkSyncManifestsMetrics(t *testing.T, manifestSuccess, manifestFailures int) { + if metric, err := findMetric("flux_daemon_sync_manifests", promdto.MetricType_GAUGE, "success", "true"); err != nil { + t.Errorf("Error collecting flux_daemon_sync_manifests{success='true'} metric: %v", err) + } else if int(*metric.Gauge.Value) != manifestSuccess { + t.Errorf("flux_daemon_sync_manifests{success='true'} must be %v. Got %v", manifestSuccess, *metric.Gauge.Value) + } + if metric, err := findMetric("flux_daemon_sync_manifests", promdto.MetricType_GAUGE, "success", "false"); err != nil { + t.Errorf("Error collecting flux_daemon_sync_manifests{success='false'} metric: %v", err) + } else if int(*metric.Gauge.Value) != manifestFailures { + t.Errorf("flux_daemon_sync_manifests{success='false'} must be %v. Got %v", manifestFailures, *metric.Gauge.Value) + } +} + func TestPullAndSync_InitialSync(t *testing.T) { d, cleanup := daemon(t) defer cleanup() @@ -148,6 +193,9 @@ func TestPullAndSync_InitialSync(t *testing.T) { } else if len(revs) <= 0 { t.Errorf("Found no revisions before the sync tag") } + + // Check 0 error stats + checkSyncManifestsMetrics(t, len(expectedResourceIDs), 0) } func TestDoSync_NoNewCommits(t *testing.T) { @@ -357,3 +405,113 @@ func TestDoSync_WithNewCommit(t *testing.T) { t.Errorf("Should have moved sync tag to HEAD (%s), but was moved to: %s", newRevision, revs[len(revs)-1].Revision) } } + +func TestDoSync_WithErrors(t *testing.T) { + d, cleanup := daemon(t) + defer cleanup() + + expectedResourceIDs := resource.IDs{} + for id, _ := range testfiles.ResourceMap { + expectedResourceIDs = append(expectedResourceIDs, id) + } + + k8s.SyncFunc = func(def cluster.SyncSet) error { + return nil + } + + ctx := context.Background() + head, err := d.Repo.BranchHead(ctx) + if err != nil { + t.Fatal(err) + } + + syncTag := "sync" + gitSync, _ := fluxsync.NewGitTagSyncProvider(d.Repo, syncTag, "", false, d.GitConfig) + syncState := &lastKnownSyncState{logger: d.Logger, state: gitSync} + + if err := d.Sync(ctx, time.Now().UTC(), head, syncState); err != nil { + t.Error(err) + } + + // Check 0 error stats + checkSyncManifestsMetrics(t, len(expectedResourceIDs), 0) + + // Now add wrong manifest + err = d.WithWorkingClone(ctx, func(checkout *git.Checkout) error { + ctx, cancel := context.WithTimeout(ctx, 5000*time.Second) + defer cancel() + + absolutePath := path.Join(checkout.Dir(), "error_manifest.yaml") + if err := ioutil.WriteFile(absolutePath, []byte("Manifest that must produce errors"), 0600); err != nil { + return err + } + commitAction := git.CommitAction{Author: "", Message: "test error commit"} + err = checkout.CommitAndPush(ctx, commitAction, nil, true) + if err != nil { + return err + } + return err + }) + if err != nil { + t.Fatal(err) + } + + err = d.Repo.Refresh(ctx) + if err != nil { + t.Error(err) + } + + if err := d.Sync(ctx, time.Now().UTC(), "HEAD", syncState); err != nil { + // Check error not nil, manifest counters remain the same + checkSyncManifestsMetrics(t, len(expectedResourceIDs), 0) + } else { + t.Error("Sync must fail because of invalid manifest") + } + + // Fix manifest + err = d.WithWorkingClone(ctx, func(checkout *git.Checkout) error { + ctx, cancel := context.WithTimeout(ctx, 5000*time.Second) + defer cancel() + + absolutePath := path.Join(checkout.Dir(), "error_manifest.yaml") + if err := ioutil.WriteFile(absolutePath, []byte("# Just comment"), 0600); err != nil { + return err + } + commitAction := git.CommitAction{Author: "", Message: "test fix commit"} + err = checkout.CommitAndPush(ctx, commitAction, nil, true) + if err != nil { + return err + } + return err + }) + + if err != nil { + t.Fatal(err) + } + + err = d.Repo.Refresh(ctx) + if err != nil { + t.Error(err) + } + + if err := d.Sync(ctx, time.Now().UTC(), "HEAD", syncState); err != nil { + t.Error(err) + } + // Check 0 manifest error stats + checkSyncManifestsMetrics(t, len(expectedResourceIDs), 0) + + // Emulate sync errors + k8s.SyncFunc = func(def cluster.SyncSet) error { + return cluster.SyncError{ + cluster.ResourceError{resource.MustParseID("mynamespace:deployment/depl1"), "src1", fmt.Errorf("Error1")}, + cluster.ResourceError{resource.MustParseID("mynamespace:deployment/depl2"), "src2", fmt.Errorf("Error2")}, + } + } + + if err := d.Sync(ctx, time.Now().UTC(), "HEAD", syncState); err != nil { + t.Error(err) + } + + // Check 2 sync error in stats + checkSyncManifestsMetrics(t, len(expectedResourceIDs)-2, 2) +}