Skip to content

Commit

Permalink
feat: scaleDownOnAbort for rollout strategies
Browse files Browse the repository at this point in the history
- if an update is aborted, the preview RS and canary RS will be
  aborted.
- scaleDownOnAbort is false by default
- updated doc

Signed-off-by: Hui Kang <[email protected]>
  • Loading branch information
Hui Kang committed May 11, 2021
1 parent 0b24f26 commit f127146
Show file tree
Hide file tree
Showing 11 changed files with 125 additions and 5 deletions.
6 changes: 3 additions & 3 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,9 @@ func NewManager(
return cm
}

// Run will sync informer caches and start controllers. It will block until stopCh
// is closed, at which point it will shutdown the workqueue and wait for
// controllers to finish processing their current work items.
// Run will set up the event handlers for types we are interested in, as well
// as syncing informer caches and starting workers. It will block until stopCh
// workers to finish processing their current work items.
func (c *Manager) Run(rolloutThreadiness, serviceThreadiness, ingressThreadiness, experimentThreadiness, analysisThreadiness int, stopCh <-chan struct{}) error {
defer runtime.HandleCrash()
defer c.serviceWorkqueue.ShutDown()
Expand Down
2 changes: 2 additions & 0 deletions manifests/crds/rollout-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ spec:
revisionHistoryLimit:
format: int32
type: integer
scaleDownOnAbort:
type: boolean
selector:
properties:
matchExpressions:
Expand Down
2 changes: 2 additions & 0 deletions manifests/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9753,6 +9753,8 @@ spec:
revisionHistoryLimit:
format: int32
type: integer
scaleDownOnAbort:
type: boolean
selector:
properties:
matchExpressions:
Expand Down
2 changes: 2 additions & 0 deletions manifests/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9753,6 +9753,8 @@ spec:
revisionHistoryLimit:
format: int32
type: integer
scaleDownOnAbort:
type: boolean
selector:
properties:
matchExpressions:
Expand Down
4 changes: 4 additions & 0 deletions pkg/apiclient/rollout/rollout.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1155,6 +1155,10 @@
"restartAt": {
"$ref": "#/definitions/k8s.io.apimachinery.pkg.apis.meta.v1.Time",
"title": "RestartAt indicates when all the pods of a Rollout should be restarted"
},
"scaleDownOnAbort": {
"type": "boolean",
"title": "ScaleDownOnAbort scales down"
}
},
"title": "RolloutSpec is the spec for a Rollout resource"
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/rollouts/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type RolloutSpec struct {
ProgressDeadlineSeconds *int32 `json:"progressDeadlineSeconds,omitempty" protobuf:"varint,8,opt,name=progressDeadlineSeconds"`
// RestartAt indicates when all the pods of a Rollout should be restarted
RestartAt *metav1.Time `json:"restartAt,omitempty" protobuf:"bytes,9,opt,name=restartAt"`
// ScaleDownOnAbort scales down
ScaleDownOnAbort bool `json:"scaleDownOnAbort,omitempty" protobuf:"varint,11,opt,name=scaleDownOnAbort"`
}

func (s *RolloutSpec) SetResolvedSelector(selector *metav1.LabelSelector) {
Expand Down
56 changes: 55 additions & 1 deletion rollout/bluegreen.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,32 @@ func (c *rolloutContext) rolloutBlueGreen() error {
return c.syncRolloutStatusBlueGreen(previewSvc, activeSvc)
}

/*
if c.newRS != nil {
c.log.Infof("hkang: reconcileBlueGreenReplicaSets, newRS '%s'", c.newRS.Name)
if c.scaleDownOnAbort {
if c.pauseContext.IsAborted() {
c.log.Info("hkang: ctx status is aborted")
_, ok := c.newRS.Annotations[v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey]
if !ok {
c.log.Info("hkang: addScaleDownDelay")
err := c.addScaleDownDelay(c.newRS)
if err != nil {
return err
}
return err
} else {
c.log.Info("hkang: addScaleDownDelay already set")
}
} else {
c.log.Info("hkang: ctx status is not aborted")
}
}
} else {
c.log.Info("hkang: newRS is nil")
}
*/

err = c.podRestarter.Reconcile(c)
if err != nil {
return err
Expand Down Expand Up @@ -98,6 +124,34 @@ func (c *rolloutContext) reconcileBlueGreenReplicaSets(activeSvc *corev1.Service
if err != nil {
return err
}

// Scale down newRS if aborted
/*
if c.newRS != nil {
c.log.Infof("hkang: reconcileBlueGreenReplicaSets, newRS '%s'", c.newRS.Name)
if c.scaleDownOnAbort {
if c.pauseContext.IsAborted() {
c.log.Info("hkang: ctx status is aborted")
_, ok := c.newRS.Annotations[v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey]
if !ok {
c.log.Info("hkang: addScaleDownDelay")
err := c.addScaleDownDelay(c.newRS)
if err != nil {
return err
}
return err
} else {
c.log.Info("hkang: addScaleDownDelay already set")
}
} else {
c.log.Info("hkang: ctx status is not aborted")
}
}
} else {
c.log.Info("hkang: newRS is nil")
}
*/

if err := c.reconcileRevisionHistoryLimit(c.otherRSs); err != nil {
return err
}
Expand Down Expand Up @@ -318,7 +372,7 @@ func (c *rolloutContext) syncRolloutStatusBlueGreen(previewSvc *corev1.Service,
return c.persistRolloutStatus(&newStatus)
}

// calculateScaleUpPreviewCheckPoint calculates the correct value of status.blueGreen.scaleUpPreviewCheckPoint
// calculateScaleUpPreviewCheckPoint calculates the correct value of status.blueGreen.scaleUpP reviewCheckPoint
// which is used by the blueGreen.previewReplicaCount feature. scaleUpPreviewCheckPoint is a single
// direction trip-wire, initialized to false, and gets flipped true as soon as the preview replicas
// matches scaleUpPreviewCheckPoint and prePromotionAnalysis (if used) completes. It get reset to
Expand Down
2 changes: 2 additions & 0 deletions rollout/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type rolloutContext struct {
// we only perform weight verification when we are at a setWeight step since we do not want to
// continually verify weight in case it could incur rate-limiting or other expenses.
weightVerified *bool

// scaleDownOnAbort bool
}

func (c *rolloutContext) reconcile() error {
Expand Down
1 change: 1 addition & 0 deletions rollout/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ func (c *Controller) newRolloutContext(rollout *v1alpha1.Rollout) (*rolloutConte
log: logCtx,
},
reconcilerBase: c.reconcilerBase,
// scaleDownOnAbort: false, // switch to true for dev
}
return &roCtx, nil
}
Expand Down
48 changes: 47 additions & 1 deletion rollout/replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,17 @@ func (c *Controller) getReplicaSetsForRollouts(r *v1alpha1.Rollout) ([]*appsv1.R
// removeScaleDownDelays removes the scale-down-deadline annotation from the new/stable ReplicaSets,
// in the event that we moved back to an older revision that is still within its scaleDownDelay.
func (c *rolloutContext) removeScaleDownDeadlines() error {
c.log.Info("hkang removeScaleDownDeadlines")
var toRemove []*appsv1.ReplicaSet
// if c.newRS != nil && !c.pauseContext.IsAborted() {
if c.newRS != nil {
c.log.Info("hkang remove scaleDown annotation in newRS")
toRemove = append(toRemove, c.newRS)
}
//} else {
c.log.Info("hkang DON'T remove scaleDown annotation in newRS")
//}

if c.stableRS != nil {
if len(toRemove) == 0 || c.stableRS.Name != c.newRS.Name {
toRemove = append(toRemove, c.stableRS)
Expand All @@ -116,14 +123,53 @@ func (c *rolloutContext) reconcileNewReplicaSet() (bool, error) {
if c.newRS == nil {
return false, nil
}
newReplicasCount, err := replicasetutil.NewRSNewReplicas(c.rollout, c.allRSs, c.newRS)
newReplicasCount := int32(0)
newIntendedReplicasCount, err := replicasetutil.NewRSNewReplicas(c.rollout, c.allRSs, c.newRS)
if err != nil {
return false, err
}

// if newRS is set to scaled down
scaleDownAtStr, scaleDownAnnotationAvailable := c.newRS.Annotations[v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey]
c.log.Infof("hkang reconcileNewReplicaset: scaledown %v (%v)", scaleDownAtStr, scaleDownAnnotationAvailable)
// if scaleDownAnnotationAvailable && c.scaleDownOnAbort {
if c.pauseContext.IsAborted() && c.rollout.Spec.ScaleDownOnAbort {
c.log.Infof("hkang scaledown on abort '%s'", c.newRS.Name)
/*
scaleDownAtTime, err := time.Parse(time.RFC3339, scaleDownAtStr)
if err != nil {
c.log.Infof("hkang No scaleDownAt label on new rs '%s'", c.newRS.Name)
}
newReplicasCount = c.determineReplicaSetCountWhenScaling(c.newRS, newReplicasCount, newReplicasCount, scaleDownAtTime)
*/
} else {
newReplicasCount = newIntendedReplicasCount
}

c.log.Infof("hkang scaleReplicaSetAndRecordEvent '%d'", newReplicasCount)
scaled, _, err := c.scaleReplicaSetAndRecordEvent(c.newRS, newReplicasCount)
return scaled, err
}

// helper to handle the replica set scale down time
// if the scale down time has not passed, enqueue the rollout
func (c *rolloutContext) determineReplicaSetCountWhenScaling(targetRS *appsv1.ReplicaSet, defaultReplicaCount int32, desiredReplicaCount int32, scaleDownAtTime time.Time) int32 {
now := metav1.Now()
scaleDownAt := metav1.NewTime(scaleDownAtTime)
if scaleDownAt.After(now.Time) {
remainingTime := scaleDownAt.Sub(now.Time)
if remainingTime < c.resyncPeriod {
// if the scale down time has not passed, enqueue the rollout
c.log.Infof("hkang RS '%s' has not reached the scaleDownTime(remainingTime %s)", targetRS.Name, remainingTime)
c.enqueueRolloutAfter(c.rollout, remainingTime)
}
// set the replica count to be the desired replica count
return desiredReplicaCount
}
// if it's after the scale down time
return defaultReplicaCount
}

// reconcileOtherReplicaSets reconciles "other" ReplicaSets.
// Other ReplicaSets are ReplicaSets are neither the new or stable (allRSs - newRS - stableRS)
func (c *rolloutContext) reconcileOtherReplicaSets() (bool, error) {
Expand Down
5 changes: 5 additions & 0 deletions utils/replicaset/canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,11 @@ func GetCanaryReplicasOrWeight(rollout *v1alpha1.Rollout) (*int32, int32) {
return nil, 100
}
if scs := UseSetCanaryScale(rollout); scs != nil {
if rollout.Status.Abort && rollout.Spec.ScaleDownOnAbort {
log.Info("hkang ScaleDownOnAbort is true")
return nil, 0
}

if scs.Replicas != nil {
return scs.Replicas, 0
} else if scs.Weight != nil {
Expand Down

0 comments on commit f127146

Please sign in to comment.