Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upgrade-manager-v2: Process next batch while waiting on nodeInterval period. #273

Merged
merged 13 commits into from
Jul 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/v1alpha1/rollingupgrade_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"strconv"
"strings"
"time"

"github.com/keikoproj/upgrade-manager/controllers/common"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -189,6 +190,7 @@ var (
FiniteStates = []string{StatusComplete, StatusError}
AllowedStrategyType = []string{string(RandomUpdateStrategy), string(UniformAcrossAzUpdateStrategy)}
AllowedStrategyMode = []string{string(UpdateStrategyModeLazy), string(UpdateStrategyModeEager)}
DefaultRequeueTime = time.Second * 30
)

// RollingUpgradeCondition describes the state of the RollingUpgrade
Expand Down
34 changes: 23 additions & 11 deletions controllers/rollingupgrade_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"fmt"
"strings"
"sync"
"time"

"github.com/go-logr/logr"
"github.com/keikoproj/aws-sdk-go-cache/cache"
Expand Down Expand Up @@ -53,8 +52,10 @@ type RollingUpgradeReconciler struct {
DrainGroupMapper *sync.Map
DrainErrorMapper *sync.Map
ClusterNodesMap *sync.Map
ReconcileMap *sync.Map
}

// RollingUpgradeAuthenticator has the clients for providers
type RollingUpgradeAuthenticator struct {
*awsprovider.AmazonClientSet
*kubeprovider.KubernetesClientSet
Expand All @@ -69,7 +70,7 @@ type RollingUpgradeAuthenticator struct {
// +kubebuilder:rbac:groups=extensions;apps,resources=daemonsets;replicasets;statefulsets,verbs=get
// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get

// Reconcile reads that state of the cluster for a RollingUpgrade object and makes changes based on the state read
// reconcile reads that state of the cluster for a RollingUpgrade object and makes changes based on the state read
// and the details in the RollingUpgrade.Spec
func (r *RollingUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
r.Info("***Reconciling***")
Expand All @@ -84,14 +85,14 @@ func (r *RollingUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Reque
return ctrl.Result{}, err
}

// If the resource is being deleted, remove it from the admissionMap
// if the resource is being deleted, remove it from the admissionMap
if !rollingUpgrade.DeletionTimestamp.IsZero() {
r.AdmissionMap.Delete(rollingUpgrade.NamespacedName())
r.Info("rolling upgrade deleted", "name", rollingUpgrade.NamespacedName())
return reconcile.Result{}, nil
}

// Stop processing upgrades which are in finite state
// stop processing upgrades which are in finite state
currentStatus := rollingUpgrade.CurrentStatus()
if common.ContainsEqualFold(v1alpha1.FiniteStates, currentStatus) {
r.AdmissionMap.Delete(rollingUpgrade.NamespacedName())
Expand All @@ -103,13 +104,19 @@ func (r *RollingUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Reque
return reconcile.Result{}, err
}

// defer a status update on the resource
defer r.UpdateStatus(rollingUpgrade)

var (
scalingGroupName = rollingUpgrade.ScalingGroupName()
inProgress bool
)

// Defer a status update on the resource
defer r.UpdateStatus(rollingUpgrade)
// at any given point in time, there should be only one reconcile operation running per ASG
if _, present := r.ReconcileMap.LoadOrStore(rollingUpgrade.NamespacedName(), scalingGroupName); present == true {
r.Info("a reconcile operation is already in progress for this ASG, requeuing", "scalingGroup", scalingGroupName, "name", rollingUpgrade.NamespacedName())
return ctrl.Result{RequeueAfter: v1alpha1.DefaultRequeueTime}, nil
}

// handle condition where multiple resources submitted targeting the same scaling group by requeing
r.AdmissionMap.Range(func(k, v interface{}) bool {
Expand All @@ -125,7 +132,7 @@ func (r *RollingUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Reque

if inProgress {
// requeue any resources which are already being processed by a different resource, until the resource is completed/deleted
return ctrl.Result{RequeueAfter: time.Second * 30}, nil
return ctrl.Result{RequeueAfter: v1alpha1.DefaultRequeueTime}, nil
}

// store the rolling upgrade in admission map
Expand All @@ -138,6 +145,7 @@ func (r *RollingUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Reque
rollingUpgrade.SetCurrentStatus(v1alpha1.StatusInit)
common.SetMetricRollupInitOrRunning(rollingUpgrade.Name)

// setup the RollingUpgradeContext needed for node rotations.
drainGroup, _ := r.DrainGroupMapper.LoadOrStore(rollingUpgrade.NamespacedName(), &sync.WaitGroup{})
drainErrs, _ := r.DrainErrorMapper.LoadOrStore(rollingUpgrade.NamespacedName(), make(chan error))

Expand Down Expand Up @@ -167,21 +175,21 @@ func (r *RollingUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Reque
return ctrl.Result{}, err
}

return reconcile.Result{RequeueAfter: time.Second * 10}, nil
return reconcile.Result{RequeueAfter: v1alpha1.DefaultRequeueTime}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *RollingUpgradeReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.RollingUpgrade{}).
Watches(&source.Kind{Type: &corev1.Node{}}, nil).
WithEventFilter(r.nodeEventsHandler()).
WithEventFilter(r.NodeEventsHandler()).
WithOptions(controller.Options{MaxConcurrentReconciles: r.maxParallel}).
Complete(r)
}

// nodesEventHandler will fetch us the nodes on corresponding events, an alternative to doing explicit API calls.
func (r *RollingUpgradeReconciler) nodeEventsHandler() predicate.Predicate {
// NodesEventHandler will fetch us the nodes on corresponding events, an alternative to doing explicit API calls.
func (r *RollingUpgradeReconciler) NodeEventsHandler() predicate.Predicate {
return predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
nodeObj, ok := e.Object.(*corev1.Node)
Expand Down Expand Up @@ -216,19 +224,23 @@ func (r *RollingUpgradeReconciler) nodeEventsHandler() predicate.Predicate {
}
}

// number of reconciles the upgrade-manager should handle in parallel
func (r *RollingUpgradeReconciler) SetMaxParallel(n int) {
if n >= 1 {
r.Info("setting max parallel reconcile", "value", n)
r.maxParallel = n
}
}

// at the end of every reconcile, update the RollingUpgrade object
func (r *RollingUpgradeReconciler) UpdateStatus(rollingUpgrade *v1alpha1.RollingUpgrade) {
r.ReconcileMap.LoadAndDelete(rollingUpgrade.NamespacedName())
if err := r.Status().Update(context.Background(), rollingUpgrade); err != nil {
r.Info("failed to update status", "message", err.Error(), "name", rollingUpgrade.NamespacedName())
}
}

// extract node objects from syncMap to a slice
func (r *RollingUpgradeReconciler) getClusterNodes() []*corev1.Node {
var clusterNodes []*corev1.Node

Expand Down
59 changes: 25 additions & 34 deletions controllers/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,35 +60,6 @@ type RollingUpgradeContext struct {
}

func (r *RollingUpgradeContext) RotateNodes() error {

// set status start time
if r.RollingUpgrade.StartTime() == "" {
r.RollingUpgrade.SetStartTime(time.Now().Format(time.RFC3339))
}
r.RollingUpgrade.SetCurrentStatus(v1alpha1.StatusRunning)
common.SetMetricRollupInitOrRunning(r.RollingUpgrade.Name)

var (
lastTerminationTime = r.RollingUpgrade.LastNodeTerminationTime()
nodeInterval = r.RollingUpgrade.NodeIntervalSeconds()
lastDrainTime = r.RollingUpgrade.LastNodeDrainTime()
drainInterval = r.RollingUpgrade.PostDrainDelaySeconds()
)

if !lastTerminationTime.IsZero() || !lastDrainTime.IsZero() {
// Check if we are still waiting on a termination delay
if time.Since(lastTerminationTime.Time).Seconds() < float64(nodeInterval) {
r.Info("reconcile requeue due to termination interval wait", "name", r.RollingUpgrade.NamespacedName())
return nil
}

// Check if we are still waiting on a drain delay
if time.Since(lastDrainTime.Time).Seconds() < float64(drainInterval) {
r.Info("reconcile requeue due to drain interval wait", "name", r.RollingUpgrade.NamespacedName())
return nil
}
}

// discover the state of AWS and K8s cluster.
if err := r.Cloud.Discover(); err != nil {
r.Info("failed to discover the cloud", "scalingGroup", r.RollingUpgrade.ScalingGroupName(), "name", r.RollingUpgrade.NamespacedName())
Expand Down Expand Up @@ -230,6 +201,24 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance)
}
}

var (
lastTerminationTime = r.RollingUpgrade.LastNodeTerminationTime()
nodeInterval = r.RollingUpgrade.NodeIntervalSeconds()
lastDrainTime = r.RollingUpgrade.LastNodeDrainTime()
drainInterval = r.RollingUpgrade.PostDrainDelaySeconds()
)

// check if we are still waiting on a termination delay
if lastTerminationTime != nil && !lastTerminationTime.IsZero() && time.Since(lastTerminationTime.Time).Seconds() < float64(nodeInterval) {
r.Info("reconcile requeue due to termination interval wait", "name", r.RollingUpgrade.NamespacedName())
return true, nil
}
// check if we are still waiting on a drain delay
if lastDrainTime != nil && !lastDrainTime.IsZero() && time.Since(lastDrainTime.Time).Seconds() < float64(drainInterval) {
r.Info("reconcile requeue due to drain interval wait", "name", r.RollingUpgrade.NamespacedName())
return true, nil
}

if reflect.DeepEqual(r.DrainManager.DrainGroup, &sync.WaitGroup{}) {
for _, target := range batch {
instanceID := aws.StringValue(target.InstanceId)
Expand Down Expand Up @@ -338,6 +327,7 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance)
r.UpdateMetricsStatus(inProcessingNodes, nodeSteps)
return true, nil
}

r.RollingUpgrade.SetLastNodeTerminationTime(&metav1.Time{Time: time.Now()})

// Turns onto NodeRotationTerminate
Expand All @@ -354,6 +344,7 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance)
}
r.NodeStep(inProcessingNodes, nodeSteps, r.RollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationTerminated)
r.DoNodeStep(inProcessingNodes, nodeSteps, r.RollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationCompleted, terminatedTime)

}
r.UpdateMetricsStatus(inProcessingNodes, nodeSteps)

Expand All @@ -380,7 +371,10 @@ func (r *RollingUpgradeContext) SelectTargets(scalingGroup *autoscaling.Group) [
r.Info("selecting batch for rotation", "batch size", unavailableInt, "name", r.RollingUpgrade.NamespacedName())
for _, instance := range r.Cloud.InProgressInstances {
if selectedInstance := awsprovider.SelectScalingGroupInstance(instance, scalingGroup); !reflect.DeepEqual(selectedInstance, &autoscaling.Instance{}) {
targets = append(targets, selectedInstance)
//In-progress instances shouldn't be considered if they are in terminating state.
if !common.ContainsEqualFold(awsprovider.TerminatingInstanceStates, aws.StringValue(selectedInstance.LifecycleState)) {
targets = append(targets, selectedInstance)
}
}
}

Expand All @@ -392,10 +386,7 @@ func (r *RollingUpgradeContext) SelectTargets(scalingGroup *autoscaling.Group) [
if r.RollingUpgrade.UpdateStrategyType() == v1alpha1.RandomUpdateStrategy {
for _, instance := range scalingGroup.Instances {
if r.IsInstanceDrifted(instance) && !common.ContainsEqualFold(awsprovider.GetInstanceIDs(targets), aws.StringValue(instance.InstanceId)) {
//In-progress instances shouldn't be considered if they are in terminating state.
if !common.ContainsEqualFold(awsprovider.TerminatingInstanceStates, aws.StringValue(instance.LifecycleState)) {
targets = append(targets, instance)
}
targets = append(targets, instance)
}
}
if unavailableInt > len(targets) {
Expand Down
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ func main() {
DrainGroupMapper: &sync.Map{},
DrainErrorMapper: &sync.Map{},
ClusterNodesMap: &sync.Map{},
ReconcileMap: &sync.Map{},
}

reconciler.SetMaxParallel(maxParallel)
Expand Down