From ec57ec9ab26d77ba42f62cd068e86a0ec2ada02a Mon Sep 17 00:00:00 2001 From: Pavel Moukhataev Date: Thu, 24 Oct 2019 13:49:59 +0300 Subject: [PATCH] Code review - https://github.com/fluxcd/flux/pull/2535#pullrequestreview-305818012 --- docs/references/monitoring.md | 11 +++++- go.mod | 8 +++++ pkg/daemon/loop.go | 7 +--- pkg/daemon/sync.go | 15 +++++--- pkg/daemon/sync_test.go | 67 +++++++++++++++++++++++++---------- 5 files changed, 78 insertions(+), 30 deletions(-) diff --git a/docs/references/monitoring.md b/docs/references/monitoring.md index 0fa2208a4..9cbc10f0f 100644 --- a/docs/references/monitoring.md +++ b/docs/references/monitoring.md @@ -14,6 +14,15 @@ The following metrics are exposed: | `flux_daemon_queue_duration_seconds` | Duration of time spent in the job queue before execution | `flux_daemon_queue_length_count` | Count of jobs waiting in the queue to be run | `flux_daemon_sync_duration_seconds` | Duration of git-to-cluster synchronisation -| `flux_daemon_sync_manifests` | Number of successfully synchronized manifests, number of errors +| `flux_daemon_sync_manifests` | Number of successfully synchronized manifests, manifest sync errors | `flux_registry_fetch_duration_seconds` | Duration of image metadata requests (from cache) | `flux_fluxd_connection_duration_seconds` | Duration in seconds of the current connection to fluxsvc + +Flux sync state can be obtained by using the following PromQL expressions: +* `delta(flux_daemon_sync_duration_seconds_count{success='true'}[6m]) < 1` - for general flux sync errors - usually if +that is true then there are some problems with infrastructure or there are manifests parse error or there are manifests +with duplicate ids. + +* `flux_daemon_sync_manifests{success='false'} > 0` - for git manifests errors - if true then there are either some +problems with applying git manifests to kubernetes - e.g. configmap size is too big to fit in annotations or +immutable field (like label selector) was changed. diff --git a/go.mod b/go.mod index 15e27bd77..e9791a2f9 100644 --- a/go.mod +++ b/go.mod @@ -13,15 +13,19 @@ require ( github.com/ghodss/yaml v1.0.0 github.com/go-kit/kit v0.9.0 github.com/golang/gddo v0.0.0-20190312205958-5a2505f3dbf0 + github.com/googleapis/gnostic v0.3.0 // indirect github.com/gorilla/mux v1.7.1 github.com/gorilla/websocket v1.4.0 + github.com/hashicorp/golang-lru v0.5.3 // indirect github.com/imdario/mergo v0.3.7 github.com/instrumenta/kubeval v0.0.0-20190804145309-805845b47dfc github.com/justinbarrick/go-k8s-portforward v1.0.4-0.20190722134107-d79fe1b9d79d + github.com/modern-go/reflect2 v1.0.1 // indirect github.com/opencontainers/go-digest v1.0.0-rc1 github.com/pkg/errors v0.8.1 github.com/pkg/term v0.0.0-20190109203006-aa71e9d9e942 github.com/prometheus/client_golang v1.1.0 + github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 github.com/ryanuber/go-glob v1.0.0 github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd @@ -31,6 +35,9 @@ require ( github.com/weaveworks/common v0.0.0-20190410110702-87611edc252e github.com/weaveworks/go-checkpoint v0.0.0-20170503165305-ebbb8b0518ab github.com/whilp/git-urls v0.0.0-20160530060445-31bac0d230fa + golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 // indirect + golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 // indirect + golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 // indirect golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3 golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 gopkg.in/yaml.v2 v2.2.2 @@ -40,6 +47,7 @@ require ( k8s.io/client-go v11.0.0+incompatible k8s.io/helm v2.13.1+incompatible k8s.io/klog v0.3.3 + k8s.io/utils v0.0.0-20190712204705-3dccf664f023 // indirect ) replace github.com/docker/distribution => github.com/2opremio/distribution v0.0.0-20190419185413-6c9727e5e5de diff --git a/pkg/daemon/loop.go b/pkg/daemon/loop.go index fca18dcdc..ac67b1e6c 100644 --- a/pkg/daemon/loop.go +++ b/pkg/daemon/loop.go @@ -92,12 +92,7 @@ func (d *Daemon) Loop(stop chan struct{}, wg *sync.WaitGroup, logger log.Logger) default: } } - started := time.Now().UTC() - err := d.Sync(context.Background(), started, syncHead, ratchet) - syncDuration.With( - fluxmetrics.LabelSuccess, fmt.Sprint(err == nil), - ).Observe(time.Since(started).Seconds()) - if err != nil { + if err := d.SyncAndMeasure(context.Background(), syncHead, ratchet); err != nil { logger.Log("err", err) } syncTimer.Reset(d.SyncInterval) diff --git a/pkg/daemon/sync.go b/pkg/daemon/sync.go index 287a60382..bf9bb91a4 100644 --- a/pkg/daemon/sync.go +++ b/pkg/daemon/sync.go @@ -4,6 +4,7 @@ import ( "context" "crypto/sha256" "encoding/base64" + "fmt" "github.com/fluxcd/flux/pkg/metrics" "github.com/go-kit/kit/log" "github.com/pkg/errors" @@ -40,6 +41,15 @@ type changeSet struct { } // Sync starts the synchronization of the cluster with git. +func (d *Daemon) SyncAndMeasure(ctx context.Context, newRevision string, ratchet revisionRatchet) error { + started := time.Now().UTC() + err := d.Sync(context.Background(), started, newRevision, ratchet) + syncDuration.With( + metrics.LabelSuccess, fmt.Sprint(err == nil), + ).Observe(time.Since(started).Seconds()) + return err +} + func (d *Daemon) Sync(ctx context.Context, started time.Time, newRevision string, ratchet revisionRatchet) error { // Make a read-only clone used for this sync ctxt, cancel := context.WithTimeout(ctx, d.GitTimeout) @@ -54,7 +64,6 @@ func (d *Daemon) Sync(ctx context.Context, started time.Time, newRevision string if d.GitSecretEnabled { ctxt, cancel := context.WithTimeout(ctx, d.GitTimeout) if err := working.SecretUnseal(ctxt); err != nil { - updateSyncManifestsMetric(0, 1) return err } cancel() @@ -63,7 +72,6 @@ func (d *Daemon) Sync(ctx context.Context, started time.Time, newRevision string // Retrieve change set of commits we need to sync c, err := getChangeSet(ctx, ratchet, newRevision, d.Repo, d.GitTimeout, d.GitConfig.Paths) if err != nil { - updateSyncManifestsMetric(0, 1) return err } @@ -71,7 +79,6 @@ func (d *Daemon) Sync(ctx context.Context, started time.Time, newRevision string syncSetName := makeGitConfigHash(d.Repo.Origin(), d.GitConfig) resourceStore, err := d.getManifestStore(working) if err != nil { - updateSyncManifestsMetric(0, 1) return errors.Wrap(err, "reading the repository checkout") } resources, resourceErrors, err := doSync(ctx, resourceStore, d.Cluster, syncSetName, d.Logger) @@ -153,7 +160,6 @@ func doSync(ctx context.Context, manifestsStore manifests.Store, clus cluster.Cl logger log.Logger) (map[string]resource.Resource, []event.ResourceError, error) { resources, err := manifestsStore.GetAllResourcesByID(ctx) if err != nil { - updateSyncManifestsMetric(0, 1) return nil, nil, errors.Wrap(err, "loading resources from repo") } @@ -171,7 +177,6 @@ func doSync(ctx context.Context, manifestsStore manifests.Store, clus cluster.Cl }) } default: - updateSyncManifestsMetric(0, 1) return nil, nil, err } } else { diff --git a/pkg/daemon/sync_test.go b/pkg/daemon/sync_test.go index ac31c2eae..166016831 100644 --- a/pkg/daemon/sync_test.go +++ b/pkg/daemon/sync_test.go @@ -118,22 +118,51 @@ func findMetric(name string, metricType promdto.MetricType, labels ...string) (* } } -func checkSyncManifestsMetrics(t *testing.T, success, failures int) { +func checkSyncManifestsMetrics(t *testing.T, syncSuccess, syncFailures, manifestSuccess, manifestFailures int) { if metric, err := findMetric("flux_daemon_sync_manifests", promdto.MetricType_GAUGE, "success", "true"); err != nil { - t.Errorf("Error collecting metrics %v", err) - } else if int(*metric.Gauge.Value) != success { - t.Errorf("Manifest success must be %v. Got %v", success, *metric.Gauge.Value) + t.Errorf("Error collecting flux_daemon_sync_manifests{success='true'} metric: %v", err) + } else if int(*metric.Gauge.Value) != manifestSuccess { + t.Errorf("flux_daemon_sync_manifests{success='true'} must be %v. Got %v", manifestSuccess, *metric.Gauge.Value) } if metric, err := findMetric("flux_daemon_sync_manifests", promdto.MetricType_GAUGE, "success", "false"); err != nil { - t.Errorf("Error collecting metrics %v", err) - } else if int(*metric.Gauge.Value) != failures { - t.Errorf("Manifest sync errors must be %v. Got %v", failures, *metric.Gauge.Value) + t.Errorf("Error collecting flux_daemon_sync_manifests{success='false'} metric: %v", err) + } else if int(*metric.Gauge.Value) != manifestFailures { + t.Errorf("flux_daemon_sync_manifests{success='false'} must be %v. Got %v", manifestFailures, *metric.Gauge.Value) + } + if metric, err := findMetric("flux_daemon_sync_duration_seconds", promdto.MetricType_HISTOGRAM, "success", "true"); err != nil { + if syncSuccess > 0 { + t.Errorf("Error collecting flux_daemon_sync_duration_seconds_count{success='true'} metric: %v", err) + } + } else if int(*metric.Histogram.SampleCount) != syncSuccess { + t.Errorf("flux_daemon_sync_duration_seconds_count{success='true'} must be %v. Got %v", syncSuccess, *metric.Histogram.SampleCount) + } + if metric, err := findMetric("flux_daemon_sync_duration_seconds", promdto.MetricType_HISTOGRAM, "success", "false"); err != nil { + if syncFailures > 0 { + t.Errorf("Error collecting flux_daemon_sync_duration_seconds_count{success='false'} metric: %v", err) + } + } else if int(*metric.Histogram.SampleCount) != syncFailures { + t.Errorf("flux_daemon_sync_duration_seconds_count{success='false'} must be %v. Got %v", syncFailures, *metric.Histogram.SampleCount) } } +func getSyncCount(t *testing.T) (success int, failure int) { + if metric, err := findMetric("flux_daemon_sync_duration_seconds", promdto.MetricType_HISTOGRAM, "success", "true"); err != nil { + success = 0 + } else { + success = int(*metric.Histogram.SampleCount) + } + if metric, err := findMetric("flux_daemon_sync_duration_seconds", promdto.MetricType_HISTOGRAM, "success", "false"); err != nil { + failure = 0 + } else { + failure = int(*metric.Histogram.SampleCount) + } + return +} + func TestPullAndSync_InitialSync(t *testing.T) { d, cleanup := daemon(t) defer cleanup() + initialSyncSuccess, initialSyncFailures := getSyncCount(t) syncCalled := 0 var syncDef *cluster.SyncSet @@ -158,7 +187,7 @@ func TestPullAndSync_InitialSync(t *testing.T) { gitSync, _ := fluxsync.NewGitTagSyncProvider(d.Repo, syncTag, "", false, d.GitConfig) syncState := &lastKnownSyncState{logger: d.Logger, state: gitSync} - if err := d.Sync(ctx, time.Now().UTC(), head, syncState); err != nil { + if err := d.SyncAndMeasure(ctx, head, syncState); err != nil { t.Error(err) } @@ -195,7 +224,7 @@ func TestPullAndSync_InitialSync(t *testing.T) { } // Check 0 error stats - checkSyncManifestsMetrics(t, len(expectedResourceIDs), 0) + checkSyncManifestsMetrics(t, initialSyncSuccess+1, initialSyncFailures, len(expectedResourceIDs), 0) } func TestDoSync_NoNewCommits(t *testing.T) { @@ -410,6 +439,8 @@ func TestDoSync_WithErrors(t *testing.T) { d, cleanup := daemon(t) defer cleanup() + initialSyncSuccess, initialSyncFailures := getSyncCount(t) + expectedResourceIDs := resource.IDs{} for id, _ := range testfiles.ResourceMap { expectedResourceIDs = append(expectedResourceIDs, id) @@ -429,12 +460,12 @@ func TestDoSync_WithErrors(t *testing.T) { gitSync, _ := fluxsync.NewGitTagSyncProvider(d.Repo, syncTag, "", false, d.GitConfig) syncState := &lastKnownSyncState{logger: d.Logger, state: gitSync} - if err := d.Sync(ctx, time.Now().UTC(), head, syncState); err != nil { + if err := d.SyncAndMeasure(ctx, head, syncState); err != nil { t.Error(err) } // Check 0 error stats - checkSyncManifestsMetrics(t, len(expectedResourceIDs), 0) + checkSyncManifestsMetrics(t, initialSyncSuccess+1, initialSyncFailures, len(expectedResourceIDs), 0) // Now add wrong manifest err = d.WithWorkingClone(ctx, func(checkout *git.Checkout) error { @@ -461,9 +492,9 @@ func TestDoSync_WithErrors(t *testing.T) { t.Error(err) } - if err := d.Sync(ctx, time.Now().UTC(), "HEAD", syncState); err != nil { - // Check 1 error stats - checkSyncManifestsMetrics(t, 0, 1) + if err := d.SyncAndMeasure(ctx, "HEAD", syncState); err != nil { + // Check 1 error stats, manifest remains the same + checkSyncManifestsMetrics(t, initialSyncSuccess+1, initialSyncFailures+1, len(expectedResourceIDs), 0) } else { t.Error("Sync must fail because of invalid manifest") } @@ -494,11 +525,11 @@ func TestDoSync_WithErrors(t *testing.T) { t.Error(err) } - if err := d.Sync(ctx, time.Now().UTC(), "HEAD", syncState); err != nil { + if err := d.SyncAndMeasure(ctx, "HEAD", syncState); err != nil { t.Error(err) } // Check 0 manifest error stats - checkSyncManifestsMetrics(t, len(expectedResourceIDs), 0) + checkSyncManifestsMetrics(t, initialSyncSuccess+2, initialSyncFailures+1, len(expectedResourceIDs), 0) // Emulate sync errors k8s.SyncFunc = func(def cluster.SyncSet) error { @@ -508,10 +539,10 @@ func TestDoSync_WithErrors(t *testing.T) { } } - if err := d.Sync(ctx, time.Now().UTC(), "HEAD", syncState); err != nil { + if err := d.SyncAndMeasure(ctx, "HEAD", syncState); err != nil { t.Error(err) } // Check 2 sync error in stats - checkSyncManifestsMetrics(t, len(expectedResourceIDs)-2, 2) + checkSyncManifestsMetrics(t, initialSyncSuccess+3, initialSyncFailures+1, len(expectedResourceIDs)-2, 2) }