Skip to content

Commit

Permalink
upgrade-manager-v2: Process next batch while waiting on nodeInterval …
Browse files Browse the repository at this point in the history
…period. (#273)

* upgrade-manager-v2: remove function duplicate declaration. (#266)

* and CR end time

Signed-off-by: sbadiger <[email protected]>

* expose totalProcessing time and other metrics

Signed-off-by: sbadiger <[email protected]>

* addressing review comments

Signed-off-by: sbadiger <[email protected]>

* remove function duplication

Signed-off-by: sbadiger <[email protected]>

* Carry the metrics status in RollingUpgrade CR (#267)

* Update metrics status at same time

Signed-off-by: xshao <[email protected]>

* Update metrics status when terminating instance

Signed-off-by: xshao <[email protected]>

* Add terminated step

Signed-off-by: xshao <[email protected]>

* Add terminated step

Signed-off-by: xshao <[email protected]>

* Add terminated step

Signed-off-by: xshao <[email protected]>
Signed-off-by: sbadiger <[email protected]>

* move cloud discovery after nodeInterval / drainInterval wait

Signed-off-by: sbadiger <[email protected]>

* Add watch event for cluster nodes instead of API calls

Signed-off-by: sbadiger <[email protected]>

* upon node deletion, remove it from syncMap as well

Signed-off-by: sbadiger <[email protected]>

* Add nodeEvents handler instead of watch handler

Signed-off-by: sbadiger <[email protected]>

* Ignore Reconciles on nodeEvents

Signed-off-by: sbadiger <[email protected]>

* Add comments

Signed-off-by: sbadiger <[email protected]>

* Set nextbatch to standBy while waiting for terminate

* Avoid parallel reconcile operation per ASG

* add default requeue time

Co-authored-by: Sheldon Shao <[email protected]>
  • Loading branch information
shreyas-badiger and shaoxt authored Jul 20, 2021
1 parent b2b39a0 commit 0e64929
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 45 deletions.
2 changes: 2 additions & 0 deletions api/v1alpha1/rollingupgrade_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"strconv"
"strings"
"time"

"github.com/keikoproj/upgrade-manager/controllers/common"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -189,6 +190,7 @@ var (
FiniteStates = []string{StatusComplete, StatusError}
AllowedStrategyType = []string{string(RandomUpdateStrategy), string(UniformAcrossAzUpdateStrategy)}
AllowedStrategyMode = []string{string(UpdateStrategyModeLazy), string(UpdateStrategyModeEager)}
DefaultRequeueTime = time.Second * 30
)

// RollingUpgradeCondition describes the state of the RollingUpgrade
Expand Down
34 changes: 23 additions & 11 deletions controllers/rollingupgrade_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"fmt"
"strings"
"sync"
"time"

"github.com/go-logr/logr"
"github.com/keikoproj/aws-sdk-go-cache/cache"
Expand Down Expand Up @@ -53,8 +52,10 @@ type RollingUpgradeReconciler struct {
DrainGroupMapper *sync.Map
DrainErrorMapper *sync.Map
ClusterNodesMap *sync.Map
ReconcileMap *sync.Map
}

// RollingUpgradeAuthenticator has the clients for providers
type RollingUpgradeAuthenticator struct {
*awsprovider.AmazonClientSet
*kubeprovider.KubernetesClientSet
Expand All @@ -69,7 +70,7 @@ type RollingUpgradeAuthenticator struct {
// +kubebuilder:rbac:groups=extensions;apps,resources=daemonsets;replicasets;statefulsets,verbs=get
// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get

// Reconcile reads that state of the cluster for a RollingUpgrade object and makes changes based on the state read
// reconcile reads that state of the cluster for a RollingUpgrade object and makes changes based on the state read
// and the details in the RollingUpgrade.Spec
func (r *RollingUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
r.Info("***Reconciling***")
Expand All @@ -84,14 +85,14 @@ func (r *RollingUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Reque
return ctrl.Result{}, err
}

// If the resource is being deleted, remove it from the admissionMap
// if the resource is being deleted, remove it from the admissionMap
if !rollingUpgrade.DeletionTimestamp.IsZero() {
r.AdmissionMap.Delete(rollingUpgrade.NamespacedName())
r.Info("rolling upgrade deleted", "name", rollingUpgrade.NamespacedName())
return reconcile.Result{}, nil
}

// Stop processing upgrades which are in finite state
// stop processing upgrades which are in finite state
currentStatus := rollingUpgrade.CurrentStatus()
if common.ContainsEqualFold(v1alpha1.FiniteStates, currentStatus) {
r.AdmissionMap.Delete(rollingUpgrade.NamespacedName())
Expand All @@ -103,13 +104,19 @@ func (r *RollingUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Reque
return reconcile.Result{}, err
}

// defer a status update on the resource
defer r.UpdateStatus(rollingUpgrade)

var (
scalingGroupName = rollingUpgrade.ScalingGroupName()
inProgress bool
)

// Defer a status update on the resource
defer r.UpdateStatus(rollingUpgrade)
// at any given point in time, there should be only one reconcile operation running per ASG
if _, present := r.ReconcileMap.LoadOrStore(rollingUpgrade.NamespacedName(), scalingGroupName); present == true {
r.Info("a reconcile operation is already in progress for this ASG, requeuing", "scalingGroup", scalingGroupName, "name", rollingUpgrade.NamespacedName())
return ctrl.Result{RequeueAfter: v1alpha1.DefaultRequeueTime}, nil
}

// handle condition where multiple resources submitted targeting the same scaling group by requeing
r.AdmissionMap.Range(func(k, v interface{}) bool {
Expand All @@ -125,7 +132,7 @@ func (r *RollingUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Reque

if inProgress {
// requeue any resources which are already being processed by a different resource, until the resource is completed/deleted
return ctrl.Result{RequeueAfter: time.Second * 30}, nil
return ctrl.Result{RequeueAfter: v1alpha1.DefaultRequeueTime}, nil
}

// store the rolling upgrade in admission map
Expand All @@ -138,6 +145,7 @@ func (r *RollingUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Reque
rollingUpgrade.SetCurrentStatus(v1alpha1.StatusInit)
common.SetMetricRollupInitOrRunning(rollingUpgrade.Name)

// setup the RollingUpgradeContext needed for node rotations.
drainGroup, _ := r.DrainGroupMapper.LoadOrStore(rollingUpgrade.NamespacedName(), &sync.WaitGroup{})
drainErrs, _ := r.DrainErrorMapper.LoadOrStore(rollingUpgrade.NamespacedName(), make(chan error))

Expand Down Expand Up @@ -167,21 +175,21 @@ func (r *RollingUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Reque
return ctrl.Result{}, err
}

return reconcile.Result{RequeueAfter: time.Second * 10}, nil
return reconcile.Result{RequeueAfter: v1alpha1.DefaultRequeueTime}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *RollingUpgradeReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.RollingUpgrade{}).
Watches(&source.Kind{Type: &corev1.Node{}}, nil).
WithEventFilter(r.nodeEventsHandler()).
WithEventFilter(r.NodeEventsHandler()).
WithOptions(controller.Options{MaxConcurrentReconciles: r.maxParallel}).
Complete(r)
}

// nodesEventHandler will fetch us the nodes on corresponding events, an alternative to doing explicit API calls.
func (r *RollingUpgradeReconciler) nodeEventsHandler() predicate.Predicate {
// NodesEventHandler will fetch us the nodes on corresponding events, an alternative to doing explicit API calls.
func (r *RollingUpgradeReconciler) NodeEventsHandler() predicate.Predicate {
return predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
nodeObj, ok := e.Object.(*corev1.Node)
Expand Down Expand Up @@ -216,19 +224,23 @@ func (r *RollingUpgradeReconciler) nodeEventsHandler() predicate.Predicate {
}
}

// number of reconciles the upgrade-manager should handle in parallel
func (r *RollingUpgradeReconciler) SetMaxParallel(n int) {
if n >= 1 {
r.Info("setting max parallel reconcile", "value", n)
r.maxParallel = n
}
}

// at the end of every reconcile, update the RollingUpgrade object
func (r *RollingUpgradeReconciler) UpdateStatus(rollingUpgrade *v1alpha1.RollingUpgrade) {
r.ReconcileMap.LoadAndDelete(rollingUpgrade.NamespacedName())
if err := r.Status().Update(context.Background(), rollingUpgrade); err != nil {
r.Info("failed to update status", "message", err.Error(), "name", rollingUpgrade.NamespacedName())
}
}

// extract node objects from syncMap to a slice
func (r *RollingUpgradeReconciler) getClusterNodes() []*corev1.Node {
var clusterNodes []*corev1.Node

Expand Down
59 changes: 25 additions & 34 deletions controllers/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,35 +60,6 @@ type RollingUpgradeContext struct {
}

func (r *RollingUpgradeContext) RotateNodes() error {

// set status start time
if r.RollingUpgrade.StartTime() == "" {
r.RollingUpgrade.SetStartTime(time.Now().Format(time.RFC3339))
}
r.RollingUpgrade.SetCurrentStatus(v1alpha1.StatusRunning)
common.SetMetricRollupInitOrRunning(r.RollingUpgrade.Name)

var (
lastTerminationTime = r.RollingUpgrade.LastNodeTerminationTime()
nodeInterval = r.RollingUpgrade.NodeIntervalSeconds()
lastDrainTime = r.RollingUpgrade.LastNodeDrainTime()
drainInterval = r.RollingUpgrade.PostDrainDelaySeconds()
)

if !lastTerminationTime.IsZero() || !lastDrainTime.IsZero() {
// Check if we are still waiting on a termination delay
if time.Since(lastTerminationTime.Time).Seconds() < float64(nodeInterval) {
r.Info("reconcile requeue due to termination interval wait", "name", r.RollingUpgrade.NamespacedName())
return nil
}

// Check if we are still waiting on a drain delay
if time.Since(lastDrainTime.Time).Seconds() < float64(drainInterval) {
r.Info("reconcile requeue due to drain interval wait", "name", r.RollingUpgrade.NamespacedName())
return nil
}
}

// discover the state of AWS and K8s cluster.
if err := r.Cloud.Discover(); err != nil {
r.Info("failed to discover the cloud", "scalingGroup", r.RollingUpgrade.ScalingGroupName(), "name", r.RollingUpgrade.NamespacedName())
Expand Down Expand Up @@ -230,6 +201,24 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance)
}
}

var (
lastTerminationTime = r.RollingUpgrade.LastNodeTerminationTime()
nodeInterval = r.RollingUpgrade.NodeIntervalSeconds()
lastDrainTime = r.RollingUpgrade.LastNodeDrainTime()
drainInterval = r.RollingUpgrade.PostDrainDelaySeconds()
)

// check if we are still waiting on a termination delay
if lastTerminationTime != nil && !lastTerminationTime.IsZero() && time.Since(lastTerminationTime.Time).Seconds() < float64(nodeInterval) {
r.Info("reconcile requeue due to termination interval wait", "name", r.RollingUpgrade.NamespacedName())
return true, nil
}
// check if we are still waiting on a drain delay
if lastDrainTime != nil && !lastDrainTime.IsZero() && time.Since(lastDrainTime.Time).Seconds() < float64(drainInterval) {
r.Info("reconcile requeue due to drain interval wait", "name", r.RollingUpgrade.NamespacedName())
return true, nil
}

if reflect.DeepEqual(r.DrainManager.DrainGroup, &sync.WaitGroup{}) {
for _, target := range batch {
instanceID := aws.StringValue(target.InstanceId)
Expand Down Expand Up @@ -338,6 +327,7 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance)
r.UpdateMetricsStatus(inProcessingNodes, nodeSteps)
return true, nil
}

r.RollingUpgrade.SetLastNodeTerminationTime(&metav1.Time{Time: time.Now()})

// Turns onto NodeRotationTerminate
Expand All @@ -354,6 +344,7 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance)
}
r.NodeStep(inProcessingNodes, nodeSteps, r.RollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationTerminated)
r.DoNodeStep(inProcessingNodes, nodeSteps, r.RollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationCompleted, terminatedTime)

}
r.UpdateMetricsStatus(inProcessingNodes, nodeSteps)

Expand All @@ -380,7 +371,10 @@ func (r *RollingUpgradeContext) SelectTargets(scalingGroup *autoscaling.Group) [
r.Info("selecting batch for rotation", "batch size", unavailableInt, "name", r.RollingUpgrade.NamespacedName())
for _, instance := range r.Cloud.InProgressInstances {
if selectedInstance := awsprovider.SelectScalingGroupInstance(instance, scalingGroup); !reflect.DeepEqual(selectedInstance, &autoscaling.Instance{}) {
targets = append(targets, selectedInstance)
//In-progress instances shouldn't be considered if they are in terminating state.
if !common.ContainsEqualFold(awsprovider.TerminatingInstanceStates, aws.StringValue(selectedInstance.LifecycleState)) {
targets = append(targets, selectedInstance)
}
}
}

Expand All @@ -392,10 +386,7 @@ func (r *RollingUpgradeContext) SelectTargets(scalingGroup *autoscaling.Group) [
if r.RollingUpgrade.UpdateStrategyType() == v1alpha1.RandomUpdateStrategy {
for _, instance := range scalingGroup.Instances {
if r.IsInstanceDrifted(instance) && !common.ContainsEqualFold(awsprovider.GetInstanceIDs(targets), aws.StringValue(instance.InstanceId)) {
//In-progress instances shouldn't be considered if they are in terminating state.
if !common.ContainsEqualFold(awsprovider.TerminatingInstanceStates, aws.StringValue(instance.LifecycleState)) {
targets = append(targets, instance)
}
targets = append(targets, instance)
}
}
if unavailableInt > len(targets) {
Expand Down
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ func main() {
DrainGroupMapper: &sync.Map{},
DrainErrorMapper: &sync.Map{},
ClusterNodesMap: &sync.Map{},
ReconcileMap: &sync.Map{},
}

reconciler.SetMaxParallel(maxParallel)
Expand Down

0 comments on commit 0e64929

Please sign in to comment.