Skip to content

Commit

Permalink
feat: add workload-ref/generation to rollout
Browse files Browse the repository at this point in the history
- Add a "rollout.argoproj.io/workload-generation" annotation to
  the rollout metadata, which equals to the generation of reference
  workload
- workload-generation is updated when the referenced workload is
  updated.
- status.workloadObservedGeneration records the observed generation
  of the rollout

Signed-off-by: Hui Kang <[email protected]>
  • Loading branch information
Hui Kang committed May 19, 2021
1 parent 6ba30ed commit 2ebd1ae
Show file tree
Hide file tree
Showing 11 changed files with 155 additions and 2 deletions.
2 changes: 1 addition & 1 deletion controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func NewManager(
serviceWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Services")
ingressWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Ingresses")

refResolver := rollout.NewInformerBasedWorkloadRefResolver(namespace, dynamicclientset, discoveryClient, rolloutWorkqueue, rolloutsInformer.Informer())
refResolver := rollout.NewInformerBasedWorkloadRefResolver(namespace, dynamicclientset, discoveryClient, argoprojclientset, rolloutWorkqueue, rolloutsInformer.Informer())

recorder := record.NewEventRecorder(kubeclientset, metrics.MetricRolloutEventsTotal)

Expand Down
2 changes: 2 additions & 0 deletions manifests/crds/rollout-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2817,6 +2817,8 @@ spec:
updatedReplicas:
format: int32
type: integer
workloadObservedGeneration:
type: string
type: object
required:
- spec
Expand Down
2 changes: 2 additions & 0 deletions manifests/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12506,6 +12506,8 @@ spec:
updatedReplicas:
format: int32
type: integer
workloadObservedGeneration:
type: string
type: object
required:
- spec
Expand Down
2 changes: 2 additions & 0 deletions manifests/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12506,6 +12506,8 @@ spec:
updatedReplicas:
format: int32
type: integer
workloadObservedGeneration:
type: string
type: object
required:
- spec
Expand Down
4 changes: 4 additions & 0 deletions pkg/apiclient/rollout/rollout.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1223,6 +1223,10 @@
"type": "string",
"title": "The generation observed by the rollout controller by taking a hash of the spec.\n+optional"
},
"workloadObservedGeneration": {
"type": "string",
"title": "The generation of referenced workload observed by the rollout controller\n+optional"
},
"conditions": {
"type": "array",
"items": {
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/rollouts/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,9 @@ type RolloutStatus struct {
// The generation observed by the rollout controller by taking a hash of the spec.
// +optional
ObservedGeneration string `json:"observedGeneration,omitempty" protobuf:"bytes,13,opt,name=observedGeneration"`
// The generation of referenced workload observed by the rollout controller
// +optional
WorkloadObservedGeneration string `json:"workloadObservedGeneration,omitempty" protobuf:"bytes,22,opt,name=workloadObservedGeneration"`
// Conditions a list of conditions a rollout can have.
// +optional
Conditions []RolloutCondition `json:"conditions,omitempty" protobuf:"bytes,14,rep,name=conditions"`
Expand Down
8 changes: 8 additions & 0 deletions rollout/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,14 @@ func (c *rolloutContext) persistRolloutStatus(newStatus *v1alpha1.RolloutStatus)
ctx := context.TODO()
prevStatus := c.rollout.Status
c.pauseContext.CalculatePauseStatus(newStatus)
if c.rollout.Spec.TemplateResolvedFromRef {
workloadRefObservation, _ := annotations.GetWorkloadGenerationAnnotation(c.rollout)
currentWorkloadObservedGeneration, _ := strconv.ParseInt(newStatus.WorkloadObservedGeneration, 10, 32)
if workloadRefObservation != int32(currentWorkloadObservedGeneration) {
newStatus.WorkloadObservedGeneration = strconv.Itoa(int(workloadRefObservation))
}
}

newStatus.ObservedGeneration = strconv.Itoa(int(c.rollout.Generation))
logCtx := logutil.WithVersionFields(c.log, c.rollout)
patch, modified, err := diff.CreateTwoWayMergePatch(
Expand Down
31 changes: 31 additions & 0 deletions rollout/temlateref.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@ import (
"context"
"encoding/json"
"fmt"
"strconv"
"sync"
"time"

"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
clientset "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned"
"github.com/argoproj/argo-rollouts/utils/annotations"
unstructuredutil "github.com/argoproj/argo-rollouts/utils/unstructured"

log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -59,13 +63,15 @@ type informerBasedTemplateResolver struct {
cancelContext context.CancelFunc
rolloutWorkQueue workqueue.Interface
rolloutsInformer cache.SharedIndexInformer
argoprojclientset clientset.Interface
}

// NewInformerBasedWorkloadRefResolver create new instance of workload ref resolver.
func NewInformerBasedWorkloadRefResolver(
namespace string,
dynamicClient dynamic.Interface,
discoClient discovery.DiscoveryInterface,
agrgoProjClientset clientset.Interface,
rolloutWorkQueue workqueue.Interface,
rolloutsInformer cache.SharedIndexInformer,
) *informerBasedTemplateResolver {
Expand All @@ -88,6 +94,7 @@ func NewInformerBasedWorkloadRefResolver(
cancelContext: cancelContext,
informerResyncDuration: time.Minute * 5,
informerSyncTimeout: time.Minute,
argoprojclientset: agrgoProjClientset,
dynamicClient: dynamicClient,
discoClient: discoClient,
rolloutWorkQueue: rolloutWorkQueue,
Expand Down Expand Up @@ -164,6 +171,14 @@ func (r *informerBasedTemplateResolver) Resolve(rollout *v1alpha1.Rollout) error
}
}

// initialize rollout workload-generation annotation
roMeta, err := meta.Accessor(obj)
if err != nil {
return err
}
generation := strconv.FormatInt(roMeta.GetGeneration(), 10)
annotations.SetRolloutWorkloadRefGeneration(rollout, generation)

return nil
}

Expand Down Expand Up @@ -219,7 +234,23 @@ func (r *informerBasedTemplateResolver) requeueReferencedRollouts(obj interface{
if err != nil {
return
}

generation := strconv.FormatInt(roMeta.GetGeneration(), 10)
for _, ro := range rollouts {
un, ok := ro.(*unstructured.Unstructured)
if ok {
rollout := unstructuredutil.ObjectToRollout(un)
updated := annotations.SetRolloutWorkloadRefGeneration(rollout, generation)
rollout.Spec.Template.Spec.Containers = []corev1.Container{}
if updated {
_, err := r.argoprojclientset.ArgoprojV1alpha1().Rollouts(rollout.Namespace).Update(context.TODO(), rollout, v1.UpdateOptions{})
if err != nil {
log.Errorf("Cannot update the workload-ref/annotation for %s/%s", rollout.GetName(), rollout.GetNamespace())
}
}

}

if key, err := cache.MetaNamespaceKeyFunc(ro); err == nil {
r.rolloutWorkQueue.Add(key)
}
Expand Down
3 changes: 2 additions & 1 deletion rollout/temlateref_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ func newFakeDiscoClient() *discofake.FakeDiscovery {

func newResolver(dynamicClient dynamic.Interface, discoveryClient disco.DiscoveryInterface, rolloutClient versioned.Interface) (*informerBasedTemplateResolver, context.CancelFunc) {
rolloutsInformer := rolloutinformers.NewRolloutInformer(rolloutClient, "", time.Minute, cache.Indexers{})
resolver := NewInformerBasedWorkloadRefResolver("", dynamicClient, discoveryClient, workqueue.NewDelayingQueue(), rolloutsInformer)
argoprojectclientset := fake.Clientset{}
resolver := NewInformerBasedWorkloadRefResolver("", dynamicClient, discoveryClient, &argoprojectclientset, workqueue.NewDelayingQueue(), rolloutsInformer)
stop := make(chan struct{})
go rolloutsInformer.Run(stop)
cache.WaitForCacheSync(stop, rolloutsInformer.HasSynced)
Expand Down
31 changes: 31 additions & 0 deletions utils/annotations/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,32 @@ const (
// in its replica sets. Helps in separating scaling events from the rollout process and for
// determining if the new replica set for a rollout is really saturated.
DesiredReplicasAnnotation = RolloutLabel + "/desired-replicas"
// WorkloadGenerationAnnotation is the generation of the referenced workload
WorkloadGenerationAnnotation = RolloutLabel + "/workload-generation"
)

// GetDesiredReplicasAnnotation returns the number of desired replicas
func GetDesiredReplicasAnnotation(rs *appsv1.ReplicaSet) (int32, bool) {
return getIntFromAnnotation(rs, DesiredReplicasAnnotation)
}

// GetWorkloadGenerationAnnotation returns generation of referenced workload
func GetWorkloadGenerationAnnotation(ro *v1alpha1.Rollout) (int32, bool) {
if ro == nil {
return 0, false
}
annotationValue, ok := ro.Annotations[WorkloadGenerationAnnotation]
if !ok {
return int32(0), false
}
intValue, err := strconv.ParseInt(annotationValue, 10, 32)
if err != nil {
log.Warnf("Cannot convert the value %q with annotation key %q for the replica set %q", annotationValue, WorkloadGenerationAnnotation, ro.Name)
return int32(0), false
}
return int32(intValue), true
}

func getIntFromAnnotation(rs *appsv1.ReplicaSet, annotationKey string) (int32, bool) {
if rs == nil {
return 0, false
Expand Down Expand Up @@ -60,6 +79,18 @@ func SetRolloutRevision(rollout *v1alpha1.Rollout, revision string) bool {
return false
}

// SetRolloutRevision updates the revision for a rollout.
func SetRolloutWorkloadRefGeneration(rollout *v1alpha1.Rollout, workloadGeneration string) bool {
if rollout.Annotations == nil {
rollout.Annotations = make(map[string]string)
}
if rollout.Annotations[WorkloadGenerationAnnotation] != workloadGeneration {
rollout.Annotations[WorkloadGenerationAnnotation] = workloadGeneration
return true
}
return false
}

// SetReplicasAnnotations sets the desiredReplicas into the annotations
func SetReplicasAnnotations(rs *appsv1.ReplicaSet, desiredReplicas int32) bool {
if rs.Annotations == nil {
Expand Down
69 changes: 69 additions & 0 deletions utils/annotations/annotations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,43 @@ func TestAnnotationUtils(t *testing.T) {
}
})

// Check if WorkloadRefGeneration annotations can be set
t.Run("SetRolloutWorkloadRefGeneration", func(t *testing.T) {
copyRollout := tRollout.DeepCopy()
copyRollout.Annotations = nil
updated := SetRolloutWorkloadRefGeneration(copyRollout, "1")
if !updated {
t.Errorf("SetRolloutWorkloadRefGeneration() Expected=True Obtained=False")
}
if copyRollout.Annotations[WorkloadGenerationAnnotation] != "1" {
t.Errorf("Revision Expected=1 Obtained=%s", copyRollout.Annotations[WorkloadGenerationAnnotation])
}
})

t.Run("SetRolloutWorkloadRefGenerationAlreadySet", func(t *testing.T) {
copyRollout := tRollout.DeepCopy()
copyRollout.Annotations = map[string]string{WorkloadGenerationAnnotation: "1"}
updated := SetRolloutWorkloadRefGeneration(copyRollout, "2")
if !updated {
t.Errorf("SetRolloutWorkloadRefGeneration() Expected=True Obtained=False")
}
if copyRollout.Annotations[WorkloadGenerationAnnotation] != "2" {
t.Errorf("WorkloadGeneration Expected=1 Obtained=%s", copyRollout.Annotations[WorkloadGenerationAnnotation])
}
})

t.Run("SetRolloutWorkloadRefGenerationUnchanged", func(t *testing.T) {
copyRollout := tRollout.DeepCopy()
copyRollout.Annotations = map[string]string{WorkloadGenerationAnnotation: "2"}
updated := SetRolloutWorkloadRefGeneration(copyRollout, "2")
if updated {
t.Errorf("SetRolloutWorkloadRefGeneration() Expected=False Obtained=True")
}
if copyRollout.Annotations[WorkloadGenerationAnnotation] != "2" {
t.Errorf("WorkloadGeneration Expected=2 Obtained=%s", copyRollout.Annotations[WorkloadGenerationAnnotation])
}
})

t.Run("SetRolloutRevisionAlreadySet", func(t *testing.T) {
copyRollout := tRollout.DeepCopy()
copyRollout.Labels = map[string]string{RevisionAnnotation: "2"}
Expand Down Expand Up @@ -237,6 +274,38 @@ func TestAnnotationUtils(t *testing.T) {
}
})

// Check if we can grab annotations from rollout
t.Run("GetDesiredReplicasAnnotationNotSet", func(t *testing.T) {
generation, ok := GetWorkloadGenerationAnnotation(&tRollout)
assert.False(t, ok)
assert.Equal(t, int32(0), generation)
})

tRollout.Annotations[WorkloadGenerationAnnotation] = "1"
t.Run("GetDesiredReplicasAnnotation", func(t *testing.T) {
generation, ok := GetWorkloadGenerationAnnotation(&tRollout)
assert.True(t, ok)
assert.Equal(t, int32(1), generation)
})

tRollout.Annotations[WorkloadGenerationAnnotation] = "20000000000"
t.Run("GetDesiredReplicasAnnotationOutOfRange", func(t *testing.T) {
_, ok := GetWorkloadGenerationAnnotation(&tRollout)
assert.Falsef(t, ok, "Should be an error as 20M value does not fit into int32")
})

t.Run("GetWorkloadGenerationAnnotationNilInput", func(t *testing.T) {
generation, ok := GetWorkloadGenerationAnnotation(nil)
assert.False(t, ok)
assert.Equal(t, int32(0), generation)
})

tRollout.Annotations[WorkloadGenerationAnnotation] = "Not a number"
t.Run("GetWorkloadGenerationAnnotationInvalidAnnotations", func(t *testing.T) {
_, ok := GetWorkloadGenerationAnnotation(&tRollout)
assert.False(t, ok)
})

copyRS := tRS.DeepCopy()
copyRS.Annotations = nil
t.Run("GetDesiredReplicasAnnotationNoAnnotations", func(t *testing.T) {
Expand Down

0 comments on commit 2ebd1ae

Please sign in to comment.