Skip to content

Commit

Permalink
fix: Fix premature scaledown (#365)
Browse files Browse the repository at this point in the history
  • Loading branch information
dthomson25 authored Jan 22, 2020
1 parent 96f3ca4 commit 9b9f341
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 67 deletions.
64 changes: 37 additions & 27 deletions rollout/bluegreen.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,15 @@ func (c *RolloutController) reconcileBlueGreenPause(activeSvc, previewSvc *corev
}

// scaleDownOldReplicaSetsForBlueGreen scales down old replica sets when rollout strategy is "Blue Green".
func (c *RolloutController) scaleDownOldReplicaSetsForBlueGreen(oldRSs []*appsv1.ReplicaSet, rollout *v1alpha1.Rollout) (int32, error) {
func (c *RolloutController) scaleDownOldReplicaSetsForBlueGreen(oldRSs []*appsv1.ReplicaSet, rollout *v1alpha1.Rollout) (bool, error) {
logCtx := logutil.WithRollout(rollout)
sort.Sort(sort.Reverse(replicasetutil.ReplicaSetsByRevisionNumber(oldRSs)))

totalScaledDown := int32(0)
hasScaled := false
annotationedRSs := int32(0)
rolloutReplicas := defaults.GetReplicasOrDefault(rollout.Spec.Replicas)
for _, targetRS := range oldRSs {
desiredReplicaCount := int32(0)
if scaleDownAtStr, ok := targetRS.Annotations[v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey]; ok {
annotationedRSs++
scaleDownAtTime, err := time.Parse(time.RFC3339, scaleDownAtStr)
Expand All @@ -201,26 +203,23 @@ func (c *RolloutController) scaleDownOldReplicaSetsForBlueGreen(oldRSs []*appsv1
if remainingTime < c.resyncPeriod {
c.enqueueRolloutAfter(rollout, remainingTime)
}
continue
desiredReplicaCount = rolloutReplicas
}
}
}
if *(targetRS.Spec.Replicas) == 0 {
// cannot scale down this ReplicaSet.
if *(targetRS.Spec.Replicas) == desiredReplicaCount {
// at desired account
continue
}
scaleDownCount := *(targetRS.Spec.Replicas)
// Scale down.
newReplicasCount := int32(0)
_, _, err := c.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, rollout)
_, _, err := c.scaleReplicaSetAndRecordEvent(targetRS, desiredReplicaCount, rollout)
if err != nil {
return totalScaledDown, err
return false, err
}

totalScaledDown += scaleDownCount
hasScaled = true
}

return totalScaledDown, nil
return hasScaled, nil
}

func (c *RolloutController) syncRolloutStatusBlueGreen(previewSvc *corev1.Service, activeSvc *corev1.Service, roCtx *blueGreenContext) error {
Expand Down Expand Up @@ -297,14 +296,6 @@ func calculateScaleUpPreviewCheckPoint(roCtx *blueGreenContext, activeRS *appsv1
// Should run only on scaling events and not during the normal rollout process.
func (c *RolloutController) scaleBlueGreen(rollout *v1alpha1.Rollout, newRS *appsv1.ReplicaSet, oldRSs []*appsv1.ReplicaSet, previewSvc *corev1.Service, activeSvc *corev1.Service) error {
rolloutReplicas := defaults.GetReplicasOrDefault(rollout.Spec.Replicas)
previewSelector, ok := serviceutil.GetRolloutSelectorLabel(previewSvc)
if !ok {
previewSelector = ""
}
activeSelector, ok := serviceutil.GetRolloutSelectorLabel(activeSvc)
if !ok {
activeSelector = ""
}

// If there is only one replica set with pods, then we should scale that up to the full count of the
// rollout. If there is no replica set with pods, then we should scale up the newest replica set.
Expand Down Expand Up @@ -335,13 +326,32 @@ func (c *RolloutController) scaleBlueGreen(rollout *v1alpha1.Rollout, newRS *app
}
}

// Old replica sets should be fully scaled down if they aren't receiving traffic from the active or
// preview service. This case handles replica set adoption during a saturated new replica set.
for _, old := range controller.FilterActiveReplicaSets(oldRSs) {
oldLabel, ok := old.Labels[v1alpha1.DefaultRolloutUniqueLabelKey]
if !ok || (oldLabel != activeSelector && oldLabel != previewSelector) {
if _, _, err := c.scaleReplicaSetAndRecordEvent(old, 0, rollout); err != nil {
return err
sort.Sort(sort.Reverse(replicasetutil.ReplicaSetsByRevisionNumber(oldRSs)))

annotationedRSs := int32(0)
logCtx := logutil.WithRollout(rollout)
for _, targetRS := range oldRSs {
newDesiredReplicas := int32(0)
if scaleDownAtStr, ok := targetRS.Annotations[v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey]; ok {
annotationedRSs++
scaleDownAtTime, err := time.Parse(time.RFC3339, scaleDownAtStr)
if err != nil {
logCtx.Warnf("Unable to read scaleDownAt label on rs '%s'", targetRS.Name)
} else if rollout.Spec.Strategy.BlueGreen.ScaleDownDelayRevisionLimit != nil && annotationedRSs == *rollout.Spec.Strategy.BlueGreen.ScaleDownDelayRevisionLimit {
logCtx.Info("At ScaleDownDelayRevisionLimit and scaling down the rest")
} else {
now := metav1.Now()
scaleDownAt := metav1.NewTime(scaleDownAtTime)
if scaleDownAt.After(now.Time) {
newDesiredReplicas = rolloutReplicas
}
}

if *(targetRS.Spec.Replicas) != newDesiredReplicas {
_, _, err := c.scaleReplicaSetAndRecordEvent(targetRS, newDesiredReplicas, rollout)
if err != nil {
return err
}
}
}
}
Expand Down
6 changes: 2 additions & 4 deletions rollout/canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,10 @@ func (c *RolloutController) reconcileOldReplicaSetsCanary(allRSs []*appsv1.Repli

// Clean up unhealthy replicas first, otherwise unhealthy replicas will block rollout
// and cause timeout. See https://github.com/kubernetes/kubernetes/issues/16737
oldRSs, cleanupCount, err := c.cleanupUnhealthyReplicas(oldRSs, roCtx)
oldRSs, cleanedUpRSs, err := c.cleanupUnhealthyReplicas(oldRSs, roCtx)
if err != nil {
return false, nil
}
logCtx.Infof("Cleaned up unhealthy replicas from old RSes by %d", cleanupCount)

// Scale down old replica sets, need check replicasToKeep to ensure we can scale down
scaledDownCount, err := c.scaleDownOldReplicaSetsForCanary(allRSs, oldRSs, rollout)
Expand All @@ -175,8 +174,7 @@ func (c *RolloutController) reconcileOldReplicaSetsCanary(allRSs []*appsv1.Repli
}
logCtx.Infof("Scaled down old RSes by %d", scaledDownCount)

totalScaledDown := cleanupCount + scaledDownCount
return totalScaledDown > 0, nil
return cleanedUpRSs || scaledDownCount > 0, nil
}

// scaleDownOldReplicaSetsForCanary scales down old replica sets when rollout strategy is "canary".
Expand Down
32 changes: 15 additions & 17 deletions rollout/replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
"github.com/argoproj/argo-rollouts/utils/defaults"
logutil "github.com/argoproj/argo-rollouts/utils/log"
replicasetutil "github.com/argoproj/argo-rollouts/utils/replicaset"
)

Expand Down Expand Up @@ -87,38 +86,37 @@ func (c *RolloutController) reconcileNewReplicaSet(roCtx rolloutContext) (bool,

func (c *RolloutController) reconcileOldReplicaSets(oldRSs []*appsv1.ReplicaSet, roCtx rolloutContext) (bool, error) {
rollout := roCtx.Rollout()
logCtx := logutil.WithRollout(rollout)

oldPodsCount := replicasetutil.GetReplicaCountForReplicaSets(oldRSs)
if oldPodsCount == 0 {
// Can't scale down further
return false, nil
}

// Clean up unhealthy replicas first, otherwise unhealthy replicas will block rollout
// and cause timeout. See https://github.com/kubernetes/kubernetes/issues/16737
oldRSs, cleanupCount, err := c.cleanupUnhealthyReplicas(oldRSs, roCtx)
if err != nil {
return false, nil
var err error
hasScaled := false
if rollout.Spec.Strategy.Canary != nil {
// Clean up unhealthy replicas first, otherwise unhealthy replicas will block rollout
// and cause timeout. See https://github.com/kubernetes/kubernetes/issues/16737
oldRSs, hasScaled, err = c.cleanupUnhealthyReplicas(oldRSs, roCtx)
if err != nil {
return false, nil
}
}
logCtx.Infof("Cleaned up unhealthy replicas from old RSes by %d", cleanupCount)

// Scale down old replica sets
scaledDownCount := int32(0)
if rollout.Spec.Strategy.BlueGreen != nil {
scaledDownCount, err = c.scaleDownOldReplicaSetsForBlueGreen(oldRSs, rollout)
hasScaled, err = c.scaleDownOldReplicaSetsForBlueGreen(oldRSs, rollout)
if err != nil {
return false, nil
}
}
logCtx.Infof("Scaled down old RSes by %d", scaledDownCount)

totalScaledDown := cleanupCount + scaledDownCount
return totalScaledDown > 0, nil
return hasScaled, nil
}

// cleanupUnhealthyReplicas will scale down old replica sets with unhealthy replicas, so that all unhealthy replicas will be deleted.
func (c *RolloutController) cleanupUnhealthyReplicas(oldRSs []*appsv1.ReplicaSet, roCtx rolloutContext) ([]*appsv1.ReplicaSet, int32, error) {
func (c *RolloutController) cleanupUnhealthyReplicas(oldRSs []*appsv1.ReplicaSet, roCtx rolloutContext) ([]*appsv1.ReplicaSet, bool, error) {
logCtx := roCtx.Log()
sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs))
// Safely scale down all old replica sets with unhealthy replicas. Replica set will sort the pods in the order
Expand All @@ -139,14 +137,14 @@ func (c *RolloutController) cleanupUnhealthyReplicas(oldRSs []*appsv1.ReplicaSet
scaledDownCount := *(targetRS.Spec.Replicas) - targetRS.Status.AvailableReplicas
newReplicasCount := targetRS.Status.AvailableReplicas
if newReplicasCount > *(targetRS.Spec.Replicas) {
return nil, 0, fmt.Errorf("when cleaning up unhealthy replicas, got invalid request to scale down %s/%s %d -> %d", targetRS.Namespace, targetRS.Name, *(targetRS.Spec.Replicas), newReplicasCount)
return nil, false, fmt.Errorf("when cleaning up unhealthy replicas, got invalid request to scale down %s/%s %d -> %d", targetRS.Namespace, targetRS.Name, *(targetRS.Spec.Replicas), newReplicasCount)
}
_, updatedOldRS, err := c.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, roCtx.Rollout())
if err != nil {
return nil, totalScaledDown, err
return nil, totalScaledDown > 0, err
}
totalScaledDown += scaledDownCount
oldRSs[i] = updatedOldRS
}
return oldRSs, totalScaledDown, nil
return oldRSs, totalScaledDown > 0, nil
}
19 changes: 0 additions & 19 deletions rollout/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,25 +109,6 @@ func TestScaleBlueGreen(t *testing.T) {
previewSvc: nil,
activeSvc: nil,
},
{
name: "Scale down older active replica sets",
rollout: newRolloutWithStatus("foo", 5, nil, nil),
oldRollout: newRolloutWithStatus("foo", 5, nil, nil),

newRS: rs("foo-v2", 5, nil, newTimestamp, nil),
oldRSs: []*appsv1.ReplicaSet{
rs("foo-v1", 5, map[string]string{v1alpha1.DefaultRolloutUniqueLabelKey: "foo-v1"}, oldTimestamp, nil),
rs("foo-v0", 5, nil, olderTimestamp, nil),
},

expectedNew: rs("foo-v2", 5, nil, newTimestamp, nil),
expectedOld: []*appsv1.ReplicaSet{
rs("foo-v1", 5, map[string]string{v1alpha1.DefaultRolloutUniqueLabelKey: "foo-v1"}, oldTimestamp, nil),
rs("foo-v0", 0, nil, olderTimestamp, nil),
},
previewSvc: nil,
activeSvc: newService("active", 80, map[string]string{v1alpha1.DefaultRolloutUniqueLabelKey: "foo-v1"}),
},
{
name: "No updates",
rollout: newRolloutWithStatus("foo", 5, nil, nil),
Expand Down

0 comments on commit 9b9f341

Please sign in to comment.