Skip to content

Commit

Permalink
Change requeue rollout to updating its annotations on workload ref ch…
Browse files Browse the repository at this point in the history
…ange

- doc update: migration.md
- Remove the workqueue of informerBasedTemplateResolver

Signed-off-by: Hui Kang <[email protected]>
  • Loading branch information
Hui Kang committed Jul 2, 2021
1 parent 96473c8 commit 8bd3ab6
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 46 deletions.
2 changes: 1 addition & 1 deletion controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func NewManager(
serviceWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Services")
ingressWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Ingresses")

refResolver := rollout.NewInformerBasedWorkloadRefResolver(namespace, dynamicclientset, discoveryClient, argoprojclientset, rolloutWorkqueue, rolloutsInformer.Informer())
refResolver := rollout.NewInformerBasedWorkloadRefResolver(namespace, dynamicclientset, discoveryClient, argoprojclientset, rolloutsInformer.Informer())
apiFactory := api.NewFactory(record.NewAPIFactorySettings(), defaults.Namespace(), secretInformer.Informer(), configMapInformer.Informer())
recorder := record.NewEventRecorder(kubeclientset, metrics.MetricRolloutEventsTotal, apiFactory)
notificationsController := controller.NewController(dynamicclientset.Resource(v1alpha1.RolloutGVR), rolloutsInformer.Informer(), apiFactory,
Expand Down
2 changes: 2 additions & 0 deletions docs/migrating.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ After creation Rollout will spinup required number of Pods side-by-side with the
Rollout won't try to manage existing Deployment Pods. That means you can safely update add Rollout
to the production environment without any interruption but you are going to run twice more Pods during migration.

Argo-rollouts controller patches the spec of rollout object with an annotation of `rollout.argoproj.io/workload-generation`, which equals the generation of referenced deployment. Users can detect if the rollout matches desired generation of deployment by checking the `workloadObservedGeneration` in the rollout status.

**Traffic Management During Migration**

The Rollout offers traffic management functionality that manages routing rules and flows the traffic to different
Expand Down
2 changes: 2 additions & 0 deletions rollout/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,8 @@ func (c *rolloutContext) persistRolloutStatus(newStatus *v1alpha1.RolloutStatus)
if workloadRefObservation != int32(currentWorkloadObservedGeneration) {
newStatus.WorkloadObservedGeneration = strconv.Itoa(int(workloadRefObservation))
}
} else {
newStatus.WorkloadObservedGeneration = ""
}

newStatus.ObservedGeneration = strconv.Itoa(int(c.rollout.Generation))
Expand Down
35 changes: 15 additions & 20 deletions rollout/temlateref.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)

const (
Expand Down Expand Up @@ -61,7 +60,6 @@ type informerBasedTemplateResolver struct {
discoClient discovery.DiscoveryInterface
ctx context.Context
cancelContext context.CancelFunc
rolloutWorkQueue workqueue.Interface
rolloutsInformer cache.SharedIndexInformer
argoprojclientset clientset.Interface
}
Expand All @@ -72,7 +70,6 @@ func NewInformerBasedWorkloadRefResolver(
dynamicClient dynamic.Interface,
discoClient discovery.DiscoveryInterface,
agrgoProjClientset clientset.Interface,
rolloutWorkQueue workqueue.Interface,
rolloutsInformer cache.SharedIndexInformer,
) *informerBasedTemplateResolver {
ctx, cancelContext := context.WithCancel(context.TODO())
Expand All @@ -97,7 +94,6 @@ func NewInformerBasedWorkloadRefResolver(
argoprojclientset: agrgoProjClientset,
dynamicClient: dynamicClient,
discoClient: discoClient,
rolloutWorkQueue: rolloutWorkQueue,
rolloutsInformer: rolloutsInformer,
}
}
Expand Down Expand Up @@ -178,11 +174,11 @@ func (r *informerBasedTemplateResolver) Resolve(rollout *v1alpha1.Rollout) error
}

// initialize rollout workload-generation annotation
roMeta, err := meta.Accessor(obj)
workloadMeta, err := meta.Accessor(obj)
if err != nil {
return err
}
generation := strconv.FormatInt(roMeta.GetGeneration(), 10)
generation := strconv.FormatInt(workloadMeta.GetGeneration(), 10)
annotations.SetRolloutWorkloadRefGeneration(rollout, generation)

return nil
Expand Down Expand Up @@ -213,52 +209,51 @@ func (r *informerBasedTemplateResolver) newInformerForGVK(gvk schema.GroupVersio
nil)
informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
r.requeueReferencedRollouts(obj, gvk)
r.updateRolloutsReferenceAnnotation(obj, gvk)
},
UpdateFunc: func(oldObj, newObj interface{}) {
r.requeueReferencedRollouts(newObj, gvk)
r.updateRolloutsReferenceAnnotation(newObj, gvk)
},
DeleteFunc: func(obj interface{}) {
r.requeueReferencedRollouts(obj, gvk)
r.updateRolloutsReferenceAnnotation(obj, gvk)
},
})
return informer, nil

}

// requeueReferencedRollouts re-queues all rollouts referenced by given object
func (r *informerBasedTemplateResolver) requeueReferencedRollouts(obj interface{}, gvk schema.GroupVersionKind) {
roMeta, err := meta.Accessor(obj)
// updateRolloutsReferenceAnnotation update the annotation of all rollouts referenced by given object
func (r *informerBasedTemplateResolver) updateRolloutsReferenceAnnotation(obj interface{}, gvk schema.GroupVersionKind) {
workloadMeta, err := meta.Accessor(obj)
if err != nil {
return
}

rollouts, err := r.rolloutsInformer.GetIndexer().ByIndex(templateRefIndexName, refKey(v1alpha1.ObjectRef{
Kind: gvk.Kind,
APIVersion: gvk.GroupVersion().String(),
Name: roMeta.GetName(),
}, roMeta.GetNamespace()))
Name: workloadMeta.GetName(),
}, workloadMeta.GetNamespace()))
if err != nil {
return
}

generation := strconv.FormatInt(roMeta.GetGeneration(), 10)
generation := strconv.FormatInt(workloadMeta.GetGeneration(), 10)
for _, ro := range rollouts {
un, ok := ro.(*unstructured.Unstructured)
if ok {
rollout := unstructuredutil.ObjectToRollout(un)
updated := annotations.SetRolloutWorkloadRefGeneration(rollout, generation)
rollout.Spec.Template.Spec.Containers = []corev1.Container{}
if updated {
rollout.Spec.Template.Spec.Containers = []corev1.Container{}
_, err := r.argoprojclientset.ArgoprojV1alpha1().Rollouts(rollout.Namespace).Update(context.TODO(), rollout, v1.UpdateOptions{})
if err != nil {
log.Errorf("Cannot update the workload-ref/annotation for %s/%s", rollout.GetName(), rollout.GetNamespace())
}
}

}

if key, err := cache.MetaNamespaceKeyFunc(ro); err == nil {
r.rolloutWorkQueue.Add(key)
} else {
fmt.Println("DEBUG: ro is not Unstructured:", ro)
}
}
}
Expand Down
54 changes: 30 additions & 24 deletions rollout/temlateref_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rollout

import (
"context"
"fmt"
"testing"
"time"

Expand All @@ -28,7 +29,6 @@ import (
"k8s.io/client-go/kubernetes/scheme"
k8stesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)

func newFakeDiscoClient() *discofake.FakeDiscovery {
Expand All @@ -55,8 +55,8 @@ func newFakeDiscoClient() *discofake.FakeDiscovery {

func newResolver(dynamicClient dynamic.Interface, discoveryClient disco.DiscoveryInterface, rolloutClient versioned.Interface) (*informerBasedTemplateResolver, context.CancelFunc) {
rolloutsInformer := rolloutinformers.NewRolloutInformer(rolloutClient, "", time.Minute, cache.Indexers{})
argoprojectclientset := fake.Clientset{}
resolver := NewInformerBasedWorkloadRefResolver("", dynamicClient, discoveryClient, &argoprojectclientset, workqueue.NewDelayingQueue(), rolloutsInformer)
// argoprojectclientset := fake.Clientset{}
resolver := NewInformerBasedWorkloadRefResolver("", dynamicClient, discoveryClient, rolloutClient, rolloutsInformer)
stop := make(chan struct{})
go rolloutsInformer.Run(stop)
cache.WaitForCacheSync(stop, rolloutsInformer.HasSynced)
Expand Down Expand Up @@ -330,18 +330,31 @@ func TestRequeueReferencedRollouts(t *testing.T) {
}

rollout := v1alpha1.Rollout{
TypeMeta: metav1.TypeMeta{
APIVersion: v1alpha1.SchemeGroupVersion.String(),
Kind: "Rollout",
},
ObjectMeta: v1.ObjectMeta{
Name: "my-rollout",
Namespace: "default",
},
Spec: v1alpha1.RolloutSpec{
Strategy: v1alpha1.RolloutStrategy{
Canary: &v1alpha1.CanaryStrategy{},
},
WorkloadRef: &v1alpha1.ObjectRef{
Name: "my-deployment",
Kind: "Deployment",
APIVersion: "apps/v1",
},
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"test": "app",
},
},
},
}

rolloutsClient := fake.NewSimpleClientset(&rollout)

discoveryClient := newFakeDiscoClient()
Expand All @@ -351,30 +364,24 @@ func TestRequeueReferencedRollouts(t *testing.T) {

err := resolver.Resolve(&rollout)
require.NoError(t, err)
assert.True(t, len(rollout.GetAnnotations()) > 0, "Number of annotatioin must be > 0 after resolve")
// manually apply the update
rolloutsClient.ArgoprojV1alpha1().Rollouts("default").Update(context.TODO(), &rollout, v1.UpdateOptions{})

// Update the deployment
deploymentsClient := dynamicClient.Resource(appsv1.SchemeGroupVersion.WithResource("deployments")).Namespace("default")

_, err = deploymentsClient.Update(context.TODO(), mustToUnstructured(deployment), v1.UpdateOptions{})
require.NoError(t, err)

go func() {
// shutdown queue to make sure test fails if requeue functionality is broken
time.Sleep(5 * time.Second)
resolver.rolloutWorkQueue.ShutDown()
}()

item, done := resolver.rolloutWorkQueue.Get()
require.False(t, done)
assert.Equal(t, "default/my-rollout", item)
resolver.rolloutWorkQueue.Done(item)
ro, _ := rolloutsClient.ArgoprojV1alpha1().Rollouts("default").Get(context.TODO(), "my-rollout", v1.GetOptions{})
assert.NotNil(t, ro)
assert.True(t, len(ro.GetAnnotations()) > 0, "Number of annotatioin must be > 0")
fmt.Println("DEBUG:", ro.GetAnnotations())

err = deploymentsClient.Delete(context.TODO(), deployment.Name, v1.DeleteOptions{})
require.NoError(t, err)

item, done = resolver.rolloutWorkQueue.Get()
require.False(t, done)
assert.Equal(t, "default/my-rollout", item)
resolver.rolloutWorkQueue.Done(item)
/*
err = deploymentsClient.Delete(context.TODO(), deployment.Name, v1.DeleteOptions{})
require.NoError(t, err)
*/
}

func TestRequeueReferencedRollouts_InvalidMeta(t *testing.T) {
Expand All @@ -383,9 +390,8 @@ func TestRequeueReferencedRollouts_InvalidMeta(t *testing.T) {
resolver, cancel := newResolver(dynamicClient, discoveryClient, fake.NewSimpleClientset())
defer cancel()

resolver.requeueReferencedRollouts(nil, schema.GroupVersionKind{})

assert.Equal(t, 0, resolver.rolloutWorkQueue.Len())
resolver.updateRolloutsReferenceAnnotation(nil, schema.GroupVersionKind{})
// TODO: verify annotation is unchanged for ro
}

func TestResolveNotRef(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/canary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ spec:
When().
PromoteRollout().
WaitForRolloutStatus("Degraded").
WaitForRolloutStatus("Healthy", time.Second*90)
WaitForRolloutStatus("Healthy")
}

// TestCanaryScaleDownDelay verifies canary uses a scaleDownDelay when traffic routing is used,
Expand Down

0 comments on commit 8bd3ab6

Please sign in to comment.