diff --git a/rollout/controller.go b/rollout/controller.go index ebc17c1303..1d7d8cde92 100644 --- a/rollout/controller.go +++ b/rollout/controller.go @@ -132,6 +132,7 @@ type reconcilerBase struct { replicaSetSynced cache.InformerSynced rolloutsInformer cache.SharedIndexInformer rolloutsLister listers.RolloutLister + replicaSetInformer cache.SharedIndexInformer rolloutsSynced cache.InformerSynced rolloutsIndexer cache.Indexer servicesLister v1.ServiceLister @@ -176,7 +177,6 @@ func NewController(cfg ControllerConfig) *Controller { controllerutil.EnqueueAfter(obj, duration, cfg.RolloutWorkQueue) }, } - base := reconcilerBase{ kubeclientset: cfg.KubeClientSet, argoprojclientset: cfg.ArgoProjClientset, @@ -185,6 +185,7 @@ func NewController(cfg ControllerConfig) *Controller { replicaSetLister: cfg.ReplicaSetInformer.Lister(), replicaSetSynced: cfg.ReplicaSetInformer.Informer().HasSynced, rolloutsInformer: cfg.RolloutsInformer.Informer(), + replicaSetInformer: cfg.ReplicaSetInformer.Informer(), rolloutsIndexer: cfg.RolloutsInformer.Informer().GetIndexer(), rolloutsLister: cfg.RolloutsInformer.Lister(), rolloutsSynced: cfg.RolloutsInformer.Informer().HasSynced, diff --git a/rollout/replicaset.go b/rollout/replicaset.go index dceff65aa0..fad23e756e 100644 --- a/rollout/replicaset.go +++ b/rollout/replicaset.go @@ -35,6 +35,7 @@ func (c *rolloutContext) removeScaleDownDelay(rs *appsv1.ReplicaSet) error { _, err := c.kubeclientset.AppsV1().ReplicaSets(rs.Namespace).Patch(ctx, rs.Name, patchtypes.JSONPatchType, []byte(patch), metav1.PatchOptions{}) if err == nil { c.log.Infof("Removed '%s' annotation from RS '%s'", v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey, rs.Name) + c.replicaSetInformer.GetIndexer().Update(rs) } return err } @@ -56,9 +57,10 @@ func (c *rolloutContext) addScaleDownDelay(rs *appsv1.ReplicaSet, scaleDownDelay } deadline := timeutil.MetaNow().Add(scaleDownDelaySeconds).UTC().Format(time.RFC3339) patch := fmt.Sprintf(addScaleDownAtAnnotationsPatch, v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey, deadline) - _, err := c.kubeclientset.AppsV1().ReplicaSets(rs.Namespace).Patch(ctx, rs.Name, patchtypes.JSONPatchType, []byte(patch), metav1.PatchOptions{}) + rs, err := c.kubeclientset.AppsV1().ReplicaSets(rs.Namespace).Patch(ctx, rs.Name, patchtypes.JSONPatchType, []byte(patch), metav1.PatchOptions{}) if err == nil { c.log.Infof("Set '%s' annotation on '%s' to %s (%s)", v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey, rs.Name, deadline, scaleDownDelaySeconds) + c.replicaSetInformer.GetIndexer().Update(rs) } return err } diff --git a/rollout/replicaset_test.go b/rollout/replicaset_test.go index 26b05b5b54..10c1dc0893 100644 --- a/rollout/replicaset_test.go +++ b/rollout/replicaset_test.go @@ -195,16 +195,29 @@ func TestReconcileNewReplicaSet(t *testing.T) { rollout := newBlueGreenRollout("foo", test.rolloutReplicas, nil, "", "") fake := fake.Clientset{} k8sfake := k8sfake.Clientset{} + + f := newFixture(t) + defer f.Close() + f.objects = append(f.objects, rollout) + f.replicaSetLister = append(f.replicaSetLister, oldRS, newRS) + f.kubeobjects = append(f.kubeobjects, oldRS, newRS) + _, informers, k8sInformer := f.newController(noResyncPeriodFunc) + stopCh := make(chan struct{}) + informers.Start(stopCh) + informers.WaitForCacheSync(stopCh) + close(stopCh) + roCtx := rolloutContext{ log: logutil.WithRollout(rollout), rollout: rollout, newRS: newRS, stableRS: oldRS, reconcilerBase: reconcilerBase{ - argoprojclientset: &fake, - kubeclientset: &k8sfake, - recorder: record.NewFakeEventRecorder(), - resyncPeriod: 30 * time.Second, + argoprojclientset: &fake, + kubeclientset: &k8sfake, + recorder: record.NewFakeEventRecorder(), + resyncPeriod: 30 * time.Second, + replicaSetInformer: k8sInformer.Apps().V1().ReplicaSets().Informer(), }, pauseContext: &pauseContext{ rollout: rollout, diff --git a/rollout/sync.go b/rollout/sync.go index f13a3489c7..25c0c14813 100644 --- a/rollout/sync.go +++ b/rollout/sync.go @@ -85,7 +85,14 @@ func (c *rolloutContext) syncReplicaSetRevision() (*appsv1.ReplicaSet, error) { if annotationsUpdated || minReadySecondsNeedsUpdate || affinityNeedsUpdate { rsCopy.Spec.MinReadySeconds = c.rollout.Spec.MinReadySeconds rsCopy.Spec.Template.Spec.Affinity = replicasetutil.GenerateReplicaSetAffinity(*c.rollout) - return c.kubeclientset.AppsV1().ReplicaSets(rsCopy.ObjectMeta.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{}) + rs, err := c.kubeclientset.AppsV1().ReplicaSets(rsCopy.ObjectMeta.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{}) + if err != nil { + c.log.WithError(err).Error("Error: updating replicaset revision") + return nil, fmt.Errorf("error updating replicaset revision: %v", err) + } + c.log.Infof("Synced revision on ReplicaSet '%s' to '%s'", rs.Name, newRevision) + c.replicaSetInformer.GetIndexer().Update(rs) + return rs, nil } // Should use the revision in existingNewRS's annotation, since it set by before @@ -370,6 +377,7 @@ func (c *rolloutContext) scaleReplicaSet(rs *appsv1.ReplicaSet, newScale int32, scaled = true revision, _ := replicasetutil.Revision(rs) c.recorder.Eventf(rollout, record.EventOptions{EventReason: conditions.ScalingReplicaSetReason}, conditions.ScalingReplicaSetMessage, scalingOperation, rs.Name, revision, oldScale, newScale) + c.replicaSetInformer.GetIndexer().Update(rs) } } return scaled, rs, err