diff --git a/controller/controller.go b/controller/controller.go index 945f80b347..25fa6c0c77 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -252,9 +252,9 @@ func NewManager( return cm } -// Run will sync informer caches and start controllers. It will block until stopCh -// is closed, at which point it will shutdown the workqueue and wait for -// controllers to finish processing their current work items. +// Run will set up the event handlers for types we are interested in, as well +// as syncing informer caches and starting workers. It will block until stopCh +// workers to finish processing their current work items. func (c *Manager) Run(rolloutThreadiness, serviceThreadiness, ingressThreadiness, experimentThreadiness, analysisThreadiness int, stopCh <-chan struct{}) error { defer runtime.HandleCrash() defer c.serviceWorkqueue.ShutDown() diff --git a/manifests/crds/rollout-crd.yaml b/manifests/crds/rollout-crd.yaml index bceef3edf5..4ca6f2308d 100644 --- a/manifests/crds/rollout-crd.yaml +++ b/manifests/crds/rollout-crd.yaml @@ -64,6 +64,8 @@ spec: revisionHistoryLimit: format: int32 type: integer + scaleDownOnAbort: + type: boolean selector: properties: matchExpressions: diff --git a/manifests/install.yaml b/manifests/install.yaml index 3eb0ed9241..21617077d1 100644 --- a/manifests/install.yaml +++ b/manifests/install.yaml @@ -9753,6 +9753,8 @@ spec: revisionHistoryLimit: format: int32 type: integer + scaleDownOnAbort: + type: boolean selector: properties: matchExpressions: diff --git a/manifests/namespace-install.yaml b/manifests/namespace-install.yaml index e287817f07..eec18188cd 100644 --- a/manifests/namespace-install.yaml +++ b/manifests/namespace-install.yaml @@ -9753,6 +9753,8 @@ spec: revisionHistoryLimit: format: int32 type: integer + scaleDownOnAbort: + type: boolean selector: properties: matchExpressions: diff --git a/pkg/apiclient/rollout/rollout.swagger.json b/pkg/apiclient/rollout/rollout.swagger.json index f984d322c2..d1cb053328 100644 --- a/pkg/apiclient/rollout/rollout.swagger.json +++ b/pkg/apiclient/rollout/rollout.swagger.json @@ -1155,6 +1155,10 @@ "restartAt": { "$ref": "#/definitions/k8s.io.apimachinery.pkg.apis.meta.v1.Time", "title": "RestartAt indicates when all the pods of a Rollout should be restarted" + }, + "scaleDownOnAbort": { + "type": "boolean", + "title": "ScaleDownOnAbort scales down" } }, "title": "RolloutSpec is the spec for a Rollout resource" diff --git a/pkg/apis/rollouts/v1alpha1/types.go b/pkg/apis/rollouts/v1alpha1/types.go index 4836ef4026..272cd90fcf 100644 --- a/pkg/apis/rollouts/v1alpha1/types.go +++ b/pkg/apis/rollouts/v1alpha1/types.go @@ -71,6 +71,8 @@ type RolloutSpec struct { ProgressDeadlineSeconds *int32 `json:"progressDeadlineSeconds,omitempty" protobuf:"varint,8,opt,name=progressDeadlineSeconds"` // RestartAt indicates when all the pods of a Rollout should be restarted RestartAt *metav1.Time `json:"restartAt,omitempty" protobuf:"bytes,9,opt,name=restartAt"` + // ScaleDownOnAbort scales down + ScaleDownOnAbort bool `json:"scaleDownOnAbort,omitempty" protobuf:"varint,11,opt,name=scaleDownOnAbort"` } func (s *RolloutSpec) SetResolvedSelector(selector *metav1.LabelSelector) { diff --git a/rollout/bluegreen.go b/rollout/bluegreen.go index 4fb7b2beba..b7e370fa08 100644 --- a/rollout/bluegreen.go +++ b/rollout/bluegreen.go @@ -30,6 +30,32 @@ func (c *rolloutContext) rolloutBlueGreen() error { return c.syncRolloutStatusBlueGreen(previewSvc, activeSvc) } + /* + if c.newRS != nil { + c.log.Infof("hkang: reconcileBlueGreenReplicaSets, newRS '%s'", c.newRS.Name) + if c.scaleDownOnAbort { + if c.pauseContext.IsAborted() { + c.log.Info("hkang: ctx status is aborted") + _, ok := c.newRS.Annotations[v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey] + if !ok { + c.log.Info("hkang: addScaleDownDelay") + err := c.addScaleDownDelay(c.newRS) + if err != nil { + return err + } + return err + } else { + c.log.Info("hkang: addScaleDownDelay already set") + } + } else { + c.log.Info("hkang: ctx status is not aborted") + } + } + } else { + c.log.Info("hkang: newRS is nil") + } + */ + err = c.podRestarter.Reconcile(c) if err != nil { return err @@ -98,6 +124,34 @@ func (c *rolloutContext) reconcileBlueGreenReplicaSets(activeSvc *corev1.Service if err != nil { return err } + + // Scale down newRS if aborted + /* + if c.newRS != nil { + c.log.Infof("hkang: reconcileBlueGreenReplicaSets, newRS '%s'", c.newRS.Name) + if c.scaleDownOnAbort { + if c.pauseContext.IsAborted() { + c.log.Info("hkang: ctx status is aborted") + _, ok := c.newRS.Annotations[v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey] + if !ok { + c.log.Info("hkang: addScaleDownDelay") + err := c.addScaleDownDelay(c.newRS) + if err != nil { + return err + } + return err + } else { + c.log.Info("hkang: addScaleDownDelay already set") + } + } else { + c.log.Info("hkang: ctx status is not aborted") + } + } + } else { + c.log.Info("hkang: newRS is nil") + } + */ + if err := c.reconcileRevisionHistoryLimit(c.otherRSs); err != nil { return err } @@ -318,7 +372,7 @@ func (c *rolloutContext) syncRolloutStatusBlueGreen(previewSvc *corev1.Service, return c.persistRolloutStatus(&newStatus) } -// calculateScaleUpPreviewCheckPoint calculates the correct value of status.blueGreen.scaleUpPreviewCheckPoint +// calculateScaleUpPreviewCheckPoint calculates the correct value of status.blueGreen.scaleUpP reviewCheckPoint // which is used by the blueGreen.previewReplicaCount feature. scaleUpPreviewCheckPoint is a single // direction trip-wire, initialized to false, and gets flipped true as soon as the preview replicas // matches scaleUpPreviewCheckPoint and prePromotionAnalysis (if used) completes. It get reset to diff --git a/rollout/context.go b/rollout/context.go index f21eaf4c8d..7cbcff85f9 100644 --- a/rollout/context.go +++ b/rollout/context.go @@ -46,6 +46,8 @@ type rolloutContext struct { // we only perform weight verification when we are at a setWeight step since we do not want to // continually verify weight in case it could incur rate-limiting or other expenses. weightVerified *bool + + // scaleDownOnAbort bool } func (c *rolloutContext) reconcile() error { diff --git a/rollout/controller.go b/rollout/controller.go index ca2dd141b5..87ed46a743 100644 --- a/rollout/controller.go +++ b/rollout/controller.go @@ -454,6 +454,7 @@ func (c *Controller) newRolloutContext(rollout *v1alpha1.Rollout) (*rolloutConte log: logCtx, }, reconcilerBase: c.reconcilerBase, + // scaleDownOnAbort: false, // switch to true for dev } return &roCtx, nil } diff --git a/rollout/replicaset.go b/rollout/replicaset.go index d3d30f0718..1556127ee4 100644 --- a/rollout/replicaset.go +++ b/rollout/replicaset.go @@ -94,10 +94,17 @@ func (c *Controller) getReplicaSetsForRollouts(r *v1alpha1.Rollout) ([]*appsv1.R // removeScaleDownDelays removes the scale-down-deadline annotation from the new/stable ReplicaSets, // in the event that we moved back to an older revision that is still within its scaleDownDelay. func (c *rolloutContext) removeScaleDownDeadlines() error { + c.log.Info("hkang removeScaleDownDeadlines") var toRemove []*appsv1.ReplicaSet + // if c.newRS != nil && !c.pauseContext.IsAborted() { if c.newRS != nil { + c.log.Info("hkang remove scaleDown annotation in newRS") toRemove = append(toRemove, c.newRS) } + //} else { + c.log.Info("hkang DON'T remove scaleDown annotation in newRS") + //} + if c.stableRS != nil { if len(toRemove) == 0 || c.stableRS.Name != c.newRS.Name { toRemove = append(toRemove, c.stableRS) @@ -116,14 +123,53 @@ func (c *rolloutContext) reconcileNewReplicaSet() (bool, error) { if c.newRS == nil { return false, nil } - newReplicasCount, err := replicasetutil.NewRSNewReplicas(c.rollout, c.allRSs, c.newRS) + newReplicasCount := int32(0) + newIntendedReplicasCount, err := replicasetutil.NewRSNewReplicas(c.rollout, c.allRSs, c.newRS) if err != nil { return false, err } + + // if newRS is set to scaled down + scaleDownAtStr, scaleDownAnnotationAvailable := c.newRS.Annotations[v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey] + c.log.Infof("hkang reconcileNewReplicaset: scaledown %v (%v)", scaleDownAtStr, scaleDownAnnotationAvailable) + // if scaleDownAnnotationAvailable && c.scaleDownOnAbort { + if c.pauseContext.IsAborted() && c.rollout.Spec.ScaleDownOnAbort { + c.log.Infof("hkang scaledown on abort '%s'", c.newRS.Name) + /* + scaleDownAtTime, err := time.Parse(time.RFC3339, scaleDownAtStr) + if err != nil { + c.log.Infof("hkang No scaleDownAt label on new rs '%s'", c.newRS.Name) + } + newReplicasCount = c.determineReplicaSetCountWhenScaling(c.newRS, newReplicasCount, newReplicasCount, scaleDownAtTime) + */ + } else { + newReplicasCount = newIntendedReplicasCount + } + + c.log.Infof("hkang scaleReplicaSetAndRecordEvent '%d'", newReplicasCount) scaled, _, err := c.scaleReplicaSetAndRecordEvent(c.newRS, newReplicasCount) return scaled, err } +// helper to handle the replica set scale down time +// if the scale down time has not passed, enqueue the rollout +func (c *rolloutContext) determineReplicaSetCountWhenScaling(targetRS *appsv1.ReplicaSet, defaultReplicaCount int32, desiredReplicaCount int32, scaleDownAtTime time.Time) int32 { + now := metav1.Now() + scaleDownAt := metav1.NewTime(scaleDownAtTime) + if scaleDownAt.After(now.Time) { + remainingTime := scaleDownAt.Sub(now.Time) + if remainingTime < c.resyncPeriod { + // if the scale down time has not passed, enqueue the rollout + c.log.Infof("hkang RS '%s' has not reached the scaleDownTime(remainingTime %s)", targetRS.Name, remainingTime) + c.enqueueRolloutAfter(c.rollout, remainingTime) + } + // set the replica count to be the desired replica count + return desiredReplicaCount + } + // if it's after the scale down time + return defaultReplicaCount +} + // reconcileOtherReplicaSets reconciles "other" ReplicaSets. // Other ReplicaSets are ReplicaSets are neither the new or stable (allRSs - newRS - stableRS) func (c *rolloutContext) reconcileOtherReplicaSets() (bool, error) { diff --git a/utils/replicaset/canary.go b/utils/replicaset/canary.go index 701c842a26..3a1e076170 100644 --- a/utils/replicaset/canary.go +++ b/utils/replicaset/canary.go @@ -297,6 +297,11 @@ func GetCanaryReplicasOrWeight(rollout *v1alpha1.Rollout) (*int32, int32) { return nil, 100 } if scs := UseSetCanaryScale(rollout); scs != nil { + if rollout.Status.Abort && rollout.Spec.ScaleDownOnAbort { + log.Info("hkang ScaleDownOnAbort is true") + return nil, 0 + } + if scs.Replicas != nil { return scs.Replicas, 0 } else if scs.Weight != nil {