diff --git a/controller/controller.go b/controller/controller.go index da9af33186..d007fc5436 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -146,7 +146,7 @@ func NewManager( serviceWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Services") ingressWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Ingresses") - refResolver := rollout.NewInformerBasedWorkloadRefResolver(namespace, dynamicclientset, discoveryClient, argoprojclientset, rolloutWorkqueue, rolloutsInformer.Informer()) + refResolver := rollout.NewInformerBasedWorkloadRefResolver(namespace, dynamicclientset, discoveryClient, argoprojclientset, rolloutsInformer.Informer()) apiFactory := api.NewFactory(record.NewAPIFactorySettings(), defaults.Namespace(), secretInformer.Informer(), configMapInformer.Informer()) recorder := record.NewEventRecorder(kubeclientset, metrics.MetricRolloutEventsTotal, apiFactory) notificationsController := controller.NewController(dynamicclientset.Resource(v1alpha1.RolloutGVR), rolloutsInformer.Informer(), apiFactory, diff --git a/docs/migrating.md b/docs/migrating.md index 074b31827d..47a2861e3e 100644 --- a/docs/migrating.md +++ b/docs/migrating.md @@ -108,6 +108,8 @@ After creation Rollout will spinup required number of Pods side-by-side with the Rollout won't try to manage existing Deployment Pods. That means you can safely update add Rollout to the production environment without any interruption but you are going to run twice more Pods during migration. +Argo-rollouts controller patches the spec of rollout object with an annotation of `rollout.argoproj.io/workload-generation`, which equals the generation of referenced deployment. Users can detect if the rollout matches desired generation of deployment by checking the `workloadObservedGeneration` in the rollout status. + **Traffic Management During Migration** The Rollout offers traffic management functionality that manages routing rules and flows the traffic to different diff --git a/rollout/sync.go b/rollout/sync.go index 3aefac3ab5..1ad02a35ba 100644 --- a/rollout/sync.go +++ b/rollout/sync.go @@ -697,6 +697,8 @@ func (c *rolloutContext) persistRolloutStatus(newStatus *v1alpha1.RolloutStatus) if workloadRefObservation != int32(currentWorkloadObservedGeneration) { newStatus.WorkloadObservedGeneration = strconv.Itoa(int(workloadRefObservation)) } + } else { + newStatus.WorkloadObservedGeneration = "" } newStatus.ObservedGeneration = strconv.Itoa(int(c.rollout.Generation)) diff --git a/rollout/temlateref.go b/rollout/temlateref.go index f2d498c9a4..f7f82e5baf 100644 --- a/rollout/temlateref.go +++ b/rollout/temlateref.go @@ -25,7 +25,6 @@ import ( "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/workqueue" ) const ( @@ -61,7 +60,6 @@ type informerBasedTemplateResolver struct { discoClient discovery.DiscoveryInterface ctx context.Context cancelContext context.CancelFunc - rolloutWorkQueue workqueue.Interface rolloutsInformer cache.SharedIndexInformer argoprojclientset clientset.Interface } @@ -72,7 +70,6 @@ func NewInformerBasedWorkloadRefResolver( dynamicClient dynamic.Interface, discoClient discovery.DiscoveryInterface, agrgoProjClientset clientset.Interface, - rolloutWorkQueue workqueue.Interface, rolloutsInformer cache.SharedIndexInformer, ) *informerBasedTemplateResolver { ctx, cancelContext := context.WithCancel(context.TODO()) @@ -97,7 +94,6 @@ func NewInformerBasedWorkloadRefResolver( argoprojclientset: agrgoProjClientset, dynamicClient: dynamicClient, discoClient: discoClient, - rolloutWorkQueue: rolloutWorkQueue, rolloutsInformer: rolloutsInformer, } } @@ -178,11 +174,11 @@ func (r *informerBasedTemplateResolver) Resolve(rollout *v1alpha1.Rollout) error } // initialize rollout workload-generation annotation - roMeta, err := meta.Accessor(obj) + workloadMeta, err := meta.Accessor(obj) if err != nil { return err } - generation := strconv.FormatInt(roMeta.GetGeneration(), 10) + generation := strconv.FormatInt(workloadMeta.GetGeneration(), 10) annotations.SetRolloutWorkloadRefGeneration(rollout, generation) return nil @@ -213,52 +209,51 @@ func (r *informerBasedTemplateResolver) newInformerForGVK(gvk schema.GroupVersio nil) informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - r.requeueReferencedRollouts(obj, gvk) + r.updateRolloutsReferenceAnnotation(obj, gvk) }, UpdateFunc: func(oldObj, newObj interface{}) { - r.requeueReferencedRollouts(newObj, gvk) + r.updateRolloutsReferenceAnnotation(newObj, gvk) }, DeleteFunc: func(obj interface{}) { - r.requeueReferencedRollouts(obj, gvk) + r.updateRolloutsReferenceAnnotation(obj, gvk) }, }) return informer, nil } -// requeueReferencedRollouts re-queues all rollouts referenced by given object -func (r *informerBasedTemplateResolver) requeueReferencedRollouts(obj interface{}, gvk schema.GroupVersionKind) { - roMeta, err := meta.Accessor(obj) +// updateRolloutsReferenceAnnotation update the annotation of all rollouts referenced by given object +func (r *informerBasedTemplateResolver) updateRolloutsReferenceAnnotation(obj interface{}, gvk schema.GroupVersionKind) { + workloadMeta, err := meta.Accessor(obj) if err != nil { return } + rollouts, err := r.rolloutsInformer.GetIndexer().ByIndex(templateRefIndexName, refKey(v1alpha1.ObjectRef{ Kind: gvk.Kind, APIVersion: gvk.GroupVersion().String(), - Name: roMeta.GetName(), - }, roMeta.GetNamespace())) + Name: workloadMeta.GetName(), + }, workloadMeta.GetNamespace())) if err != nil { return } - generation := strconv.FormatInt(roMeta.GetGeneration(), 10) + generation := strconv.FormatInt(workloadMeta.GetGeneration(), 10) for _, ro := range rollouts { un, ok := ro.(*unstructured.Unstructured) if ok { rollout := unstructuredutil.ObjectToRollout(un) updated := annotations.SetRolloutWorkloadRefGeneration(rollout, generation) - rollout.Spec.Template.Spec.Containers = []corev1.Container{} if updated { + rollout.Spec.Template.Spec.Containers = []corev1.Container{} _, err := r.argoprojclientset.ArgoprojV1alpha1().Rollouts(rollout.Namespace).Update(context.TODO(), rollout, v1.UpdateOptions{}) if err != nil { log.Errorf("Cannot update the workload-ref/annotation for %s/%s", rollout.GetName(), rollout.GetNamespace()) } } - } - - if key, err := cache.MetaNamespaceKeyFunc(ro); err == nil { - r.rolloutWorkQueue.Add(key) + } else { + fmt.Println("DEBUG: ro is not Unstructured:", ro) } } } diff --git a/rollout/temlateref_test.go b/rollout/temlateref_test.go index 54b6a6c9dc..87d3b0696c 100644 --- a/rollout/temlateref_test.go +++ b/rollout/temlateref_test.go @@ -2,6 +2,7 @@ package rollout import ( "context" + "fmt" "testing" "time" @@ -28,7 +29,6 @@ import ( "k8s.io/client-go/kubernetes/scheme" k8stesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/workqueue" ) func newFakeDiscoClient() *discofake.FakeDiscovery { @@ -55,8 +55,8 @@ func newFakeDiscoClient() *discofake.FakeDiscovery { func newResolver(dynamicClient dynamic.Interface, discoveryClient disco.DiscoveryInterface, rolloutClient versioned.Interface) (*informerBasedTemplateResolver, context.CancelFunc) { rolloutsInformer := rolloutinformers.NewRolloutInformer(rolloutClient, "", time.Minute, cache.Indexers{}) - argoprojectclientset := fake.Clientset{} - resolver := NewInformerBasedWorkloadRefResolver("", dynamicClient, discoveryClient, &argoprojectclientset, workqueue.NewDelayingQueue(), rolloutsInformer) + // argoprojectclientset := fake.Clientset{} + resolver := NewInformerBasedWorkloadRefResolver("", dynamicClient, discoveryClient, rolloutClient, rolloutsInformer) stop := make(chan struct{}) go rolloutsInformer.Run(stop) cache.WaitForCacheSync(stop, rolloutsInformer.HasSynced) @@ -330,18 +330,31 @@ func TestRequeueReferencedRollouts(t *testing.T) { } rollout := v1alpha1.Rollout{ + TypeMeta: metav1.TypeMeta{ + APIVersion: v1alpha1.SchemeGroupVersion.String(), + Kind: "Rollout", + }, ObjectMeta: v1.ObjectMeta{ Name: "my-rollout", Namespace: "default", }, Spec: v1alpha1.RolloutSpec{ + Strategy: v1alpha1.RolloutStrategy{ + Canary: &v1alpha1.CanaryStrategy{}, + }, WorkloadRef: &v1alpha1.ObjectRef{ Name: "my-deployment", Kind: "Deployment", APIVersion: "apps/v1", }, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "test": "app", + }, + }, }, } + rolloutsClient := fake.NewSimpleClientset(&rollout) discoveryClient := newFakeDiscoClient() @@ -351,30 +364,24 @@ func TestRequeueReferencedRollouts(t *testing.T) { err := resolver.Resolve(&rollout) require.NoError(t, err) + assert.True(t, len(rollout.GetAnnotations()) > 0, "Number of annotatioin must be > 0 after resolve") + // manually apply the update + rolloutsClient.ArgoprojV1alpha1().Rollouts("default").Update(context.TODO(), &rollout, v1.UpdateOptions{}) + // Update the deployment deploymentsClient := dynamicClient.Resource(appsv1.SchemeGroupVersion.WithResource("deployments")).Namespace("default") - _, err = deploymentsClient.Update(context.TODO(), mustToUnstructured(deployment), v1.UpdateOptions{}) require.NoError(t, err) - go func() { - // shutdown queue to make sure test fails if requeue functionality is broken - time.Sleep(5 * time.Second) - resolver.rolloutWorkQueue.ShutDown() - }() - - item, done := resolver.rolloutWorkQueue.Get() - require.False(t, done) - assert.Equal(t, "default/my-rollout", item) - resolver.rolloutWorkQueue.Done(item) + ro, _ := rolloutsClient.ArgoprojV1alpha1().Rollouts("default").Get(context.TODO(), "my-rollout", v1.GetOptions{}) + assert.NotNil(t, ro) + assert.True(t, len(ro.GetAnnotations()) > 0, "Number of annotatioin must be > 0") + fmt.Println("DEBUG:", ro.GetAnnotations()) - err = deploymentsClient.Delete(context.TODO(), deployment.Name, v1.DeleteOptions{}) - require.NoError(t, err) - - item, done = resolver.rolloutWorkQueue.Get() - require.False(t, done) - assert.Equal(t, "default/my-rollout", item) - resolver.rolloutWorkQueue.Done(item) + /* + err = deploymentsClient.Delete(context.TODO(), deployment.Name, v1.DeleteOptions{}) + require.NoError(t, err) + */ } func TestRequeueReferencedRollouts_InvalidMeta(t *testing.T) { @@ -383,9 +390,8 @@ func TestRequeueReferencedRollouts_InvalidMeta(t *testing.T) { resolver, cancel := newResolver(dynamicClient, discoveryClient, fake.NewSimpleClientset()) defer cancel() - resolver.requeueReferencedRollouts(nil, schema.GroupVersionKind{}) - - assert.Equal(t, 0, resolver.rolloutWorkQueue.Len()) + resolver.updateRolloutsReferenceAnnotation(nil, schema.GroupVersionKind{}) + // TODO: verify annotation is unchanged for ro } func TestResolveNotRef(t *testing.T) { diff --git a/test/e2e/canary_test.go b/test/e2e/canary_test.go index dcbc1687b8..9c0158f4e3 100644 --- a/test/e2e/canary_test.go +++ b/test/e2e/canary_test.go @@ -407,7 +407,7 @@ spec: When(). PromoteRollout(). WaitForRolloutStatus("Degraded"). - WaitForRolloutStatus("Healthy", time.Second*90) + WaitForRolloutStatus("Healthy") } // TestCanaryScaleDownDelay verifies canary uses a scaleDownDelay when traffic routing is used,