Skip to content

Commit

Permalink
Merge branch 'master' into multiple-virtual-services-support
Browse files Browse the repository at this point in the history
  • Loading branch information
darshanhs09 authored Aug 27, 2021
2 parents 0c0871c + 89062e3 commit 5cdb12e
Show file tree
Hide file tree
Showing 143 changed files with 7,291 additions and 4,871 deletions.
11 changes: 11 additions & 0 deletions .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ on:
branches:
- 'master'
- 'release-*'
workflow_dispatch:
inputs:
debug_enabled:
description: 'Run the build with tmate debugging enabled (https://github.com/marketplace/actions/debugging-with-tmate)'
required: false
default: false

jobs:
test-e2e:
name: Run end-to-end tests
Expand Down Expand Up @@ -36,8 +43,12 @@ jobs:
kubectl apply -f test/e2e/crds
- name: Start controller
run: make start-e2e 2>&1 | sed -r "s/[[:cntrl:]]\[[0-9]{1,3}m//g" > /tmp/e2e-controller.log &
- name: Setup tmate session
uses: mxschmitt/action-tmate@v3
if: ${{ github.event_name == 'workflow_dispatch' && github.event.inputs.debug_enabled}}
- name: Run e2e tests
run: make test-e2e
if: ${{ !(github.event_name == 'workflow_dispatch' && github.event.inputs.debug_enabled) }}
- name: Upload e2e-controller logs
uses: actions/upload-artifact@v2
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ jobs:
path: coverage.out

- name: Upload code coverage information to codecov.io
uses: codecov/codecov-action@v1
uses: codecov/codecov-action@v2.0.3
with:
file: coverage.out

Expand Down
14 changes: 11 additions & 3 deletions .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ jobs:
with:
ref: ${{ github.event.inputs.tag }}

- name: Get SHA
id: get-sha
run: echo "::set-output name=sha::$(git log -1 --format='%H')"

- name: Set up QEMU
uses: docker/setup-qemu-action@v1

Expand All @@ -27,9 +31,7 @@ jobs:
uses: actions/cache@v2
with:
path: /tmp/.buildx-cache
key: ${{ runner.os }}-buildx-${{ github.sha }}
restore-keys: |
${{ runner.os }}-buildx-
key: ${{ runner.os }}-buildx-${{ steps.get-sha.outputs.sha }}

- name: Print Disk Usage
run: |
Expand All @@ -45,6 +47,8 @@ jobs:
ghcr.io/argoproj/argo-rollouts
tags: |
type=semver,pattern={{version}},prefix=v,value=${{ github.event.inputs.tag }}
flavor: |
latest=false
- name: Docker meta (plugin)
id: plugin-meta
Expand All @@ -55,6 +59,8 @@ jobs:
ghcr.io/argoproj/kubectl-argo-rollouts
tags: |
type=semver,pattern={{version}},prefix=v,value=${{ github.event.inputs.tag }}
flavor: |
latest=false
- name: Login to GitHub Container Registry
if: github.event_name != 'pull_request'
Expand All @@ -75,6 +81,7 @@ jobs:
- name: Build and push (controller-image)
uses: docker/build-push-action@v2
with:
context: .
platforms: linux/amd64,linux/arm64
push: true
tags: ${{ steps.controller-meta.outputs.tags }}
Expand All @@ -84,6 +91,7 @@ jobs:
- name: Build and push (plugin-image)
uses: docker/build-push-action@v2
with:
context: .
target: kubectl-argo-rollouts
platforms: linux/amd64,linux/arm64
push: true
Expand Down
1 change: 1 addition & 0 deletions OWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ approvers:

reviewers:
- dthomson25
- huikang
89 changes: 66 additions & 23 deletions analysis/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync"
"time"

"k8s.io/utils/pointer"

log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -43,20 +45,31 @@ func (c *Controller) reconcileAnalysisRun(origRun *v1alpha1.AnalysisRun) *v1alph

if run.Status.MetricResults == nil {
run.Status.MetricResults = make([]v1alpha1.MetricResult, 0)
err := analysisutil.ValidateMetrics(run.Spec.Metrics)
if err != nil {
message := fmt.Sprintf("analysis spec invalid: %v", err)
log.Warn(message)
run.Status.Phase = v1alpha1.AnalysisPhaseError
run.Status.Message = message
c.recordAnalysisRunCompletionEvent(run)
return run
}
}

tasks := generateMetricTasks(run)
resolvedMetrics, err := getResolvedMetricsWithoutSecrets(run.Spec.Metrics, run.Spec.Args)
if err != nil {
message := fmt.Sprintf("unable to resolve metric arguments: %v", err)
log.Warn(message)
run.Status.Phase = v1alpha1.AnalysisPhaseError
run.Status.Message = message
c.recordAnalysisRunCompletionEvent(run)
return run
}

err = analysisutil.ValidateMetrics(resolvedMetrics)
if err != nil {
message := fmt.Sprintf("analysis spec invalid: %v", err)
log.Warn(message)
run.Status.Phase = v1alpha1.AnalysisPhaseError
run.Status.Message = message
c.recordAnalysisRunCompletionEvent(run)
return run
}

tasks := generateMetricTasks(run, resolvedMetrics)
log.Infof("taking %d measurements", len(tasks))
err := c.runMeasurements(run, tasks)
err = c.runMeasurements(run, tasks)
if err != nil {
message := fmt.Sprintf("unable to resolve metric arguments: %v", err)
log.Warn(message)
Expand All @@ -66,7 +79,7 @@ func (c *Controller) reconcileAnalysisRun(origRun *v1alpha1.AnalysisRun) *v1alph
return run
}

newStatus, newMessage := c.assessRunStatus(run)
newStatus, newMessage := c.assessRunStatus(run, resolvedMetrics)
if newStatus != run.Status.Phase {
run.Status.Phase = newStatus
run.Status.Message = newMessage
Expand All @@ -81,7 +94,7 @@ func (c *Controller) reconcileAnalysisRun(origRun *v1alpha1.AnalysisRun) *v1alph
log.Warnf("Failed to garbage collect measurements: %v", err)
}

nextReconcileTime := calculateNextReconcileTime(run)
nextReconcileTime := calculateNextReconcileTime(run, resolvedMetrics)
if nextReconcileTime != nil {
enqueueSeconds := nextReconcileTime.Sub(time.Now())
if enqueueSeconds < 0 {
Expand All @@ -93,6 +106,27 @@ func (c *Controller) reconcileAnalysisRun(origRun *v1alpha1.AnalysisRun) *v1alph
return run
}

func getResolvedMetricsWithoutSecrets(metrics []v1alpha1.Metric, args []v1alpha1.Argument) ([]v1alpha1.Metric, error) {
newArgs := make([]v1alpha1.Argument, 0)
for _, arg := range args {
newArg := arg.DeepCopy()
if newArg.ValueFrom != nil && newArg.ValueFrom.SecretKeyRef != nil {
newArg.ValueFrom = nil
newArg.Value = pointer.StringPtr("temp-for-secret")
}
newArgs = append(newArgs, *newArg)
}
resolvedMetrics := make([]v1alpha1.Metric, 0)
for _, metric := range metrics {
resolvedMetric, err := analysisutil.ResolveMetricArgs(metric, newArgs)
if err != nil {
return nil, err
}
resolvedMetrics = append(resolvedMetrics, *resolvedMetric)
}
return resolvedMetrics, nil
}

func (c *Controller) recordAnalysisRunCompletionEvent(run *v1alpha1.AnalysisRun) {
eventType := corev1.EventTypeNormal
switch run.Status.Phase {
Expand All @@ -106,11 +140,12 @@ func (c *Controller) recordAnalysisRunCompletionEvent(run *v1alpha1.AnalysisRun)
// sync, based on the last completion times that metric was measured (if ever). If the run is
// terminating (e.g. due to manual termination or failing metric), will not schedule further
// measurements other than to resume any in-flight measurements.
func generateMetricTasks(run *v1alpha1.AnalysisRun) []metricTask {
func generateMetricTasks(run *v1alpha1.AnalysisRun, metrics []v1alpha1.Metric) []metricTask {
log := logutil.WithAnalysisRun(run)
var tasks []metricTask
terminating := analysisutil.IsTerminating(run)
for _, metric := range run.Spec.Metrics {

for i, metric := range metrics {
if analysisutil.MetricCompleted(run, metric.Name) {
continue
}
Expand All @@ -124,7 +159,7 @@ func generateMetricTasks(run *v1alpha1.AnalysisRun) []metricTask {
// last measurement is still in-progress. need to complete it
logCtx.Infof("resuming in-progress measurement")
tasks = append(tasks, metricTask{
metric: metric,
metric: run.Spec.Metrics[i],
incompleteMeasurement: lastMeasurement,
})
continue
Expand All @@ -149,7 +184,7 @@ func generateMetricTasks(run *v1alpha1.AnalysisRun) []metricTask {
}
}
// measurement never taken
tasks = append(tasks, metricTask{metric: metric})
tasks = append(tasks, metricTask{metric: run.Spec.Metrics[i]})
logCtx.Infof("running initial measurement")
continue
}
Expand All @@ -174,7 +209,7 @@ func generateMetricTasks(run *v1alpha1.AnalysisRun) []metricTask {
interval = metricInterval
}
if time.Now().After(lastMeasurement.FinishedAt.Add(interval)) {
tasks = append(tasks, metricTask{metric: metric})
tasks = append(tasks, metricTask{metric: run.Spec.Metrics[i]})
logCtx.Infof("running overdue measurement")
continue
}
Expand Down Expand Up @@ -238,7 +273,7 @@ func (c *Controller) runMeasurements(run *v1alpha1.AnalysisRun, tasks []metricTa
var resultsLock sync.Mutex
terminating := analysisutil.IsTerminating(run)

// resolve args for metricTasks
// resolve args for metric tasks
// get list of secret values for log redaction
tasks, secrets, err := c.resolveArgs(tasks, run.Spec.Args, run.Namespace)
if err != nil {
Expand Down Expand Up @@ -345,7 +380,7 @@ func (c *Controller) runMeasurements(run *v1alpha1.AnalysisRun, tasks []metricTa
// assessRunStatus assesses the overall status of this AnalysisRun
// If any metric is not yet completed, the AnalysisRun is still considered Running
// Once all metrics are complete, the worst status is used as the overall AnalysisRun status
func (c *Controller) assessRunStatus(run *v1alpha1.AnalysisRun) (v1alpha1.AnalysisPhase, string) {
func (c *Controller) assessRunStatus(run *v1alpha1.AnalysisRun, metrics []v1alpha1.Metric) (v1alpha1.AnalysisPhase, string) {
var worstStatus v1alpha1.AnalysisPhase
var worstMessage string
terminating := analysisutil.IsTerminating(run)
Expand All @@ -360,7 +395,7 @@ func (c *Controller) assessRunStatus(run *v1alpha1.AnalysisRun) (v1alpha1.Analys
}

// Iterate all metrics and update MetricResult.Phase fields based on latest measurement(s)
for _, metric := range run.Spec.Metrics {
for _, metric := range metrics {
if result := analysisutil.GetResult(run, metric.Name); result != nil {
log := logutil.WithAnalysisRun(run).WithField("metric", metric.Name)
metricStatus := assessMetricStatus(metric, *result, terminating)
Expand Down Expand Up @@ -396,6 +431,14 @@ func (c *Controller) assessRunStatus(run *v1alpha1.AnalysisRun) (v1alpha1.Analys
}
}
}
} else {
// metric hasn't started running. possible cases where some of the metrics starts with delay
everythingCompleted = false
if terminating {
// we have yet to take a single measurement, but have already been instructed to stop
log.Infof("metric assessed %s: run terminated", v1alpha1.AnalysisPhaseSuccessful)
return v1alpha1.AnalysisPhaseSuccessful, worstMessage
}
}
}
if !everythingCompleted {
Expand Down Expand Up @@ -489,9 +532,9 @@ func assessMetricFailureInconclusiveOrError(metric v1alpha1.Metric, result v1alp

// calculateNextReconcileTime calculates the next time that this AnalysisRun should be reconciled,
// based on the earliest time of all metrics intervals, counts, and their finishedAt timestamps
func calculateNextReconcileTime(run *v1alpha1.AnalysisRun) *time.Time {
func calculateNextReconcileTime(run *v1alpha1.AnalysisRun, metrics []v1alpha1.Metric) *time.Time {
var reconcileTime *time.Time
for _, metric := range run.Spec.Metrics {
for _, metric := range metrics {
if analysisutil.MetricCompleted(run, metric.Name) {
// NOTE: this also covers the case where metric.Count is reached
continue
Expand Down
Loading

0 comments on commit 5cdb12e

Please sign in to comment.