Skip to content

Commit

Permalink
Code review - fluxcd#2535 (review)
Browse files Browse the repository at this point in the history
  • Loading branch information
mpashka committed Oct 24, 2019
1 parent 6a327f5 commit ec57ec9
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 30 deletions.
11 changes: 10 additions & 1 deletion docs/references/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@ The following metrics are exposed:
| `flux_daemon_queue_duration_seconds` | Duration of time spent in the job queue before execution
| `flux_daemon_queue_length_count` | Count of jobs waiting in the queue to be run
| `flux_daemon_sync_duration_seconds` | Duration of git-to-cluster synchronisation
| `flux_daemon_sync_manifests` | Number of successfully synchronized manifests, number of errors
| `flux_daemon_sync_manifests` | Number of successfully synchronized manifests, manifest sync errors
| `flux_registry_fetch_duration_seconds` | Duration of image metadata requests (from cache)
| `flux_fluxd_connection_duration_seconds` | Duration in seconds of the current connection to fluxsvc

Flux sync state can be obtained by using the following PromQL expressions:
* `delta(flux_daemon_sync_duration_seconds_count{success='true'}[6m]) < 1` - for general flux sync errors - usually if
that is true then there are some problems with infrastructure or there are manifests parse error or there are manifests
with duplicate ids.

* `flux_daemon_sync_manifests{success='false'} > 0` - for git manifests errors - if true then there are either some
problems with applying git manifests to kubernetes - e.g. configmap size is too big to fit in annotations or
immutable field (like label selector) was changed.
8 changes: 8 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,19 @@ require (
github.com/ghodss/yaml v1.0.0
github.com/go-kit/kit v0.9.0
github.com/golang/gddo v0.0.0-20190312205958-5a2505f3dbf0
github.com/googleapis/gnostic v0.3.0 // indirect
github.com/gorilla/mux v1.7.1
github.com/gorilla/websocket v1.4.0
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/imdario/mergo v0.3.7
github.com/instrumenta/kubeval v0.0.0-20190804145309-805845b47dfc
github.com/justinbarrick/go-k8s-portforward v1.0.4-0.20190722134107-d79fe1b9d79d
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/opencontainers/go-digest v1.0.0-rc1
github.com/pkg/errors v0.8.1
github.com/pkg/term v0.0.0-20190109203006-aa71e9d9e942
github.com/prometheus/client_golang v1.1.0
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90
github.com/ryanuber/go-glob v1.0.0
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd
Expand All @@ -31,6 +35,9 @@ require (
github.com/weaveworks/common v0.0.0-20190410110702-87611edc252e
github.com/weaveworks/go-checkpoint v0.0.0-20170503165305-ebbb8b0518ab
github.com/whilp/git-urls v0.0.0-20160530060445-31bac0d230fa
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 // indirect
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 // indirect
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 // indirect
golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
gopkg.in/yaml.v2 v2.2.2
Expand All @@ -40,6 +47,7 @@ require (
k8s.io/client-go v11.0.0+incompatible
k8s.io/helm v2.13.1+incompatible
k8s.io/klog v0.3.3
k8s.io/utils v0.0.0-20190712204705-3dccf664f023 // indirect
)

replace github.com/docker/distribution => github.com/2opremio/distribution v0.0.0-20190419185413-6c9727e5e5de
Expand Down
7 changes: 1 addition & 6 deletions pkg/daemon/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,7 @@ func (d *Daemon) Loop(stop chan struct{}, wg *sync.WaitGroup, logger log.Logger)
default:
}
}
started := time.Now().UTC()
err := d.Sync(context.Background(), started, syncHead, ratchet)
syncDuration.With(
fluxmetrics.LabelSuccess, fmt.Sprint(err == nil),
).Observe(time.Since(started).Seconds())
if err != nil {
if err := d.SyncAndMeasure(context.Background(), syncHead, ratchet); err != nil {
logger.Log("err", err)
}
syncTimer.Reset(d.SyncInterval)
Expand Down
15 changes: 10 additions & 5 deletions pkg/daemon/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/sha256"
"encoding/base64"
"fmt"
"github.com/fluxcd/flux/pkg/metrics"
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
Expand Down Expand Up @@ -40,6 +41,15 @@ type changeSet struct {
}

// Sync starts the synchronization of the cluster with git.
func (d *Daemon) SyncAndMeasure(ctx context.Context, newRevision string, ratchet revisionRatchet) error {
started := time.Now().UTC()
err := d.Sync(context.Background(), started, newRevision, ratchet)
syncDuration.With(
metrics.LabelSuccess, fmt.Sprint(err == nil),
).Observe(time.Since(started).Seconds())
return err
}

func (d *Daemon) Sync(ctx context.Context, started time.Time, newRevision string, ratchet revisionRatchet) error {
// Make a read-only clone used for this sync
ctxt, cancel := context.WithTimeout(ctx, d.GitTimeout)
Expand All @@ -54,7 +64,6 @@ func (d *Daemon) Sync(ctx context.Context, started time.Time, newRevision string
if d.GitSecretEnabled {
ctxt, cancel := context.WithTimeout(ctx, d.GitTimeout)
if err := working.SecretUnseal(ctxt); err != nil {
updateSyncManifestsMetric(0, 1)
return err
}
cancel()
Expand All @@ -63,15 +72,13 @@ func (d *Daemon) Sync(ctx context.Context, started time.Time, newRevision string
// Retrieve change set of commits we need to sync
c, err := getChangeSet(ctx, ratchet, newRevision, d.Repo, d.GitTimeout, d.GitConfig.Paths)
if err != nil {
updateSyncManifestsMetric(0, 1)
return err
}

// Run actual sync of resources on cluster
syncSetName := makeGitConfigHash(d.Repo.Origin(), d.GitConfig)
resourceStore, err := d.getManifestStore(working)
if err != nil {
updateSyncManifestsMetric(0, 1)
return errors.Wrap(err, "reading the repository checkout")
}
resources, resourceErrors, err := doSync(ctx, resourceStore, d.Cluster, syncSetName, d.Logger)
Expand Down Expand Up @@ -153,7 +160,6 @@ func doSync(ctx context.Context, manifestsStore manifests.Store, clus cluster.Cl
logger log.Logger) (map[string]resource.Resource, []event.ResourceError, error) {
resources, err := manifestsStore.GetAllResourcesByID(ctx)
if err != nil {
updateSyncManifestsMetric(0, 1)
return nil, nil, errors.Wrap(err, "loading resources from repo")
}

Expand All @@ -171,7 +177,6 @@ func doSync(ctx context.Context, manifestsStore manifests.Store, clus cluster.Cl
})
}
default:
updateSyncManifestsMetric(0, 1)
return nil, nil, err
}
} else {
Expand Down
67 changes: 49 additions & 18 deletions pkg/daemon/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,22 +118,51 @@ func findMetric(name string, metricType promdto.MetricType, labels ...string) (*
}
}

func checkSyncManifestsMetrics(t *testing.T, success, failures int) {
func checkSyncManifestsMetrics(t *testing.T, syncSuccess, syncFailures, manifestSuccess, manifestFailures int) {
if metric, err := findMetric("flux_daemon_sync_manifests", promdto.MetricType_GAUGE, "success", "true"); err != nil {
t.Errorf("Error collecting metrics %v", err)
} else if int(*metric.Gauge.Value) != success {
t.Errorf("Manifest success must be %v. Got %v", success, *metric.Gauge.Value)
t.Errorf("Error collecting flux_daemon_sync_manifests{success='true'} metric: %v", err)
} else if int(*metric.Gauge.Value) != manifestSuccess {
t.Errorf("flux_daemon_sync_manifests{success='true'} must be %v. Got %v", manifestSuccess, *metric.Gauge.Value)
}
if metric, err := findMetric("flux_daemon_sync_manifests", promdto.MetricType_GAUGE, "success", "false"); err != nil {
t.Errorf("Error collecting metrics %v", err)
} else if int(*metric.Gauge.Value) != failures {
t.Errorf("Manifest sync errors must be %v. Got %v", failures, *metric.Gauge.Value)
t.Errorf("Error collecting flux_daemon_sync_manifests{success='false'} metric: %v", err)
} else if int(*metric.Gauge.Value) != manifestFailures {
t.Errorf("flux_daemon_sync_manifests{success='false'} must be %v. Got %v", manifestFailures, *metric.Gauge.Value)
}
if metric, err := findMetric("flux_daemon_sync_duration_seconds", promdto.MetricType_HISTOGRAM, "success", "true"); err != nil {
if syncSuccess > 0 {
t.Errorf("Error collecting flux_daemon_sync_duration_seconds_count{success='true'} metric: %v", err)
}
} else if int(*metric.Histogram.SampleCount) != syncSuccess {
t.Errorf("flux_daemon_sync_duration_seconds_count{success='true'} must be %v. Got %v", syncSuccess, *metric.Histogram.SampleCount)
}
if metric, err := findMetric("flux_daemon_sync_duration_seconds", promdto.MetricType_HISTOGRAM, "success", "false"); err != nil {
if syncFailures > 0 {
t.Errorf("Error collecting flux_daemon_sync_duration_seconds_count{success='false'} metric: %v", err)
}
} else if int(*metric.Histogram.SampleCount) != syncFailures {
t.Errorf("flux_daemon_sync_duration_seconds_count{success='false'} must be %v. Got %v", syncFailures, *metric.Histogram.SampleCount)
}
}

func getSyncCount(t *testing.T) (success int, failure int) {
if metric, err := findMetric("flux_daemon_sync_duration_seconds", promdto.MetricType_HISTOGRAM, "success", "true"); err != nil {
success = 0
} else {
success = int(*metric.Histogram.SampleCount)
}
if metric, err := findMetric("flux_daemon_sync_duration_seconds", promdto.MetricType_HISTOGRAM, "success", "false"); err != nil {
failure = 0
} else {
failure = int(*metric.Histogram.SampleCount)
}
return
}

func TestPullAndSync_InitialSync(t *testing.T) {
d, cleanup := daemon(t)
defer cleanup()
initialSyncSuccess, initialSyncFailures := getSyncCount(t)

syncCalled := 0
var syncDef *cluster.SyncSet
Expand All @@ -158,7 +187,7 @@ func TestPullAndSync_InitialSync(t *testing.T) {
gitSync, _ := fluxsync.NewGitTagSyncProvider(d.Repo, syncTag, "", false, d.GitConfig)
syncState := &lastKnownSyncState{logger: d.Logger, state: gitSync}

if err := d.Sync(ctx, time.Now().UTC(), head, syncState); err != nil {
if err := d.SyncAndMeasure(ctx, head, syncState); err != nil {
t.Error(err)
}

Expand Down Expand Up @@ -195,7 +224,7 @@ func TestPullAndSync_InitialSync(t *testing.T) {
}

// Check 0 error stats
checkSyncManifestsMetrics(t, len(expectedResourceIDs), 0)
checkSyncManifestsMetrics(t, initialSyncSuccess+1, initialSyncFailures, len(expectedResourceIDs), 0)
}

func TestDoSync_NoNewCommits(t *testing.T) {
Expand Down Expand Up @@ -410,6 +439,8 @@ func TestDoSync_WithErrors(t *testing.T) {
d, cleanup := daemon(t)
defer cleanup()

initialSyncSuccess, initialSyncFailures := getSyncCount(t)

expectedResourceIDs := resource.IDs{}
for id, _ := range testfiles.ResourceMap {
expectedResourceIDs = append(expectedResourceIDs, id)
Expand All @@ -429,12 +460,12 @@ func TestDoSync_WithErrors(t *testing.T) {
gitSync, _ := fluxsync.NewGitTagSyncProvider(d.Repo, syncTag, "", false, d.GitConfig)
syncState := &lastKnownSyncState{logger: d.Logger, state: gitSync}

if err := d.Sync(ctx, time.Now().UTC(), head, syncState); err != nil {
if err := d.SyncAndMeasure(ctx, head, syncState); err != nil {
t.Error(err)
}

// Check 0 error stats
checkSyncManifestsMetrics(t, len(expectedResourceIDs), 0)
checkSyncManifestsMetrics(t, initialSyncSuccess+1, initialSyncFailures, len(expectedResourceIDs), 0)

// Now add wrong manifest
err = d.WithWorkingClone(ctx, func(checkout *git.Checkout) error {
Expand All @@ -461,9 +492,9 @@ func TestDoSync_WithErrors(t *testing.T) {
t.Error(err)
}

if err := d.Sync(ctx, time.Now().UTC(), "HEAD", syncState); err != nil {
// Check 1 error stats
checkSyncManifestsMetrics(t, 0, 1)
if err := d.SyncAndMeasure(ctx, "HEAD", syncState); err != nil {
// Check 1 error stats, manifest remains the same
checkSyncManifestsMetrics(t, initialSyncSuccess+1, initialSyncFailures+1, len(expectedResourceIDs), 0)
} else {
t.Error("Sync must fail because of invalid manifest")
}
Expand Down Expand Up @@ -494,11 +525,11 @@ func TestDoSync_WithErrors(t *testing.T) {
t.Error(err)
}

if err := d.Sync(ctx, time.Now().UTC(), "HEAD", syncState); err != nil {
if err := d.SyncAndMeasure(ctx, "HEAD", syncState); err != nil {
t.Error(err)
}
// Check 0 manifest error stats
checkSyncManifestsMetrics(t, len(expectedResourceIDs), 0)
checkSyncManifestsMetrics(t, initialSyncSuccess+2, initialSyncFailures+1, len(expectedResourceIDs), 0)

// Emulate sync errors
k8s.SyncFunc = func(def cluster.SyncSet) error {
Expand All @@ -508,10 +539,10 @@ func TestDoSync_WithErrors(t *testing.T) {
}
}

if err := d.Sync(ctx, time.Now().UTC(), "HEAD", syncState); err != nil {
if err := d.SyncAndMeasure(ctx, "HEAD", syncState); err != nil {
t.Error(err)
}

// Check 2 sync error in stats
checkSyncManifestsMetrics(t, len(expectedResourceIDs)-2, 2)
checkSyncManifestsMetrics(t, initialSyncSuccess+3, initialSyncFailures+1, len(expectedResourceIDs)-2, 2)
}

0 comments on commit ec57ec9

Please sign in to comment.