diff --git a/controllers/providers/aws/autoscaling.go b/controllers/providers/aws/autoscaling.go index d578f1c8..6bfef0a6 100644 --- a/controllers/providers/aws/autoscaling.go +++ b/controllers/providers/aws/autoscaling.go @@ -53,3 +53,18 @@ func (a *AmazonClientSet) TerminateInstance(instance *autoscaling.Instance) erro } return nil } + +func (a *AmazonClientSet) SetInstanceStandBy(instance *autoscaling.Instance, scalingGroupName string) error { + instanceID := aws.StringValue(instance.InstanceId) + input := &autoscaling.EnterStandbyInput{ + AutoScalingGroupName: aws.String(scalingGroupName), + InstanceIds: aws.StringSlice([]string{instanceID}), + ShouldDecrementDesiredCapacity: aws.Bool(false), + } + + if _, err := a.AsgClient.EnterStandby(input); err != nil { + return err + } + + return nil +} diff --git a/controllers/providers/aws/utils.go b/controllers/providers/aws/utils.go index 3db1cbc5..28e5c340 100644 --- a/controllers/providers/aws/utils.go +++ b/controllers/providers/aws/utils.go @@ -125,3 +125,14 @@ func GetTemplateLatestVersion(templates []*ec2.LaunchTemplate, templateName stri } return "0" } + +func GetInServiceInstances(scalingGroup *autoscaling.Group) []string { + instances := scalingGroup.Instances + inServiceInstances := []string{} + for _, instance := range instances { + if aws.StringValue(instance.LifecycleState) == autoscaling.LifecycleStateInService { + inServiceInstances = append(inServiceInstances, aws.StringValue(instance.InstanceId)) + } + } + return inServiceInstances +} diff --git a/controllers/providers/kubernetes/utils.go b/controllers/providers/kubernetes/utils.go index 7f37508f..79f148b8 100644 --- a/controllers/providers/kubernetes/utils.go +++ b/controllers/providers/kubernetes/utils.go @@ -24,6 +24,7 @@ import ( corev1 "k8s.io/api/core/v1" + "github.com/keikoproj/upgrade-manager/api/v1alpha1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -87,11 +88,39 @@ func GetKubernetesLocalConfig() (*rest.Config, error) { func SelectNodeByInstanceID(instanceID string, nodes *corev1.NodeList) corev1.Node { for _, node := range nodes.Items { - tokens := strings.Split(node.Spec.ProviderID, "/") - nodeID := tokens[len(tokens)-1] + nodeID := GetNodeInstanceID(node) if strings.EqualFold(instanceID, nodeID) { return node } } return corev1.Node{} } + +func GetNodeInstanceID(node corev1.Node) string { + tokens := strings.Split(node.Spec.ProviderID, "/") + nodeInstanceID := tokens[len(tokens)-1] + return nodeInstanceID +} + +func IsNodeReady(node corev1.Node) bool { + for _, condition := range node.Status.Conditions { + if condition.Type == corev1.NodeReady && condition.Status == corev1.ConditionTrue { + return true + } + } + return false +} + +func IsNodePassesReadinessGates(node corev1.Node, requiredReadinessGates []v1alpha1.NodeReadinessGate) bool { + if len(requiredReadinessGates) == 0 { + return true + } + for _, gate := range requiredReadinessGates { + for key, value := range gate.MatchLabels { + if node.Labels[key] != value { + return false + } + } + } + return true +} diff --git a/controllers/upgrade.go b/controllers/upgrade.go index 6e682692..76a5d78f 100644 --- a/controllers/upgrade.go +++ b/controllers/upgrade.go @@ -111,13 +111,25 @@ func (r *RollingUpgradeReconciler) ReplaceNodeBatch(rollingUpgrade *v1alpha1.Rol // Add in-progress tag if err := r.Auth.TagEC2instance(instanceID, instanceStateTagKey, inProgressTagValue); err != nil { r.Error(err, "failed to set instance tag", "name", rollingUpgrade.NamespacedName(), "instance", instanceID) + return false, err } // Standby + if aws.StringValue(target.LifecycleState) == autoscaling.LifecycleStateInService { + r.Info("setting instance to stand-by", "name", rollingUpgrade.NamespacedName(), "instance", instanceID) + if err := r.Auth.SetInstanceStandBy(target, rollingUpgrade.Spec.AsgName); err != nil { + r.Error(err, "couldn't set the instance to stand-by", "name", rollingUpgrade.NamespacedName(), "instance", instanceID) + return false, err + } + } // Wait for desired nodes + if !r.DesiredNodesReady(rollingUpgrade) { + r.Info("new node is yet to join the cluster") + return true, nil + } - // predrain script + // Predrain script if err := r.ScriptRunner.PreDrain(scriptTarget); err != nil { return false, err } @@ -137,25 +149,20 @@ func (r *RollingUpgradeReconciler) ReplaceNodeBatch(rollingUpgrade *v1alpha1.Rol return false, err } - // Wait for desired nodes - - // Issue drain/scripts concurrently - set lastDrainTime - - // Is drained? - // Post Wait Script if err := r.ScriptRunner.PostWait(scriptTarget); err != nil { return false, err } // Terminate - set lastTerminateTime + r.Info("terminating instance", "name", rollingUpgrade.NamespacedName(), "instance", instanceID) if err := r.Auth.TerminateInstance(target); err != nil { r.Info("failed to terminate instance", "name", rollingUpgrade.NamespacedName(), "instance", instanceID, "message", err) return true, nil } rollingUpgrade.SetLastNodeTerminationTime(metav1.Time{Time: time.Now()}) - // Post Wait Script + // Post Terminate Script if err := r.ScriptRunner.PostTerminate(scriptTarget); err != nil { return false, err } @@ -334,3 +341,30 @@ func (r *RollingUpgradeReconciler) IsScalingGroupDrifted(rollingUpgrade *v1alpha } return false } + +func (r *RollingUpgradeReconciler) DesiredNodesReady(rollingUpgrade *v1alpha1.RollingUpgrade) bool { + var ( + scalingGroup = awsprovider.SelectScalingGroup(rollingUpgrade.ScalingGroupName(), r.Cloud.ScalingGroups) + desiredInstances = aws.Int64Value(scalingGroup.DesiredCapacity) + readyNodes = 0 + ) + + // wait for desired instances + inServiceInstances := awsprovider.GetInServiceInstances(scalingGroup) + if len(inServiceInstances) != int(desiredInstances) { + return false + } + + // wait for desired nodes + for _, node := range r.Cloud.ClusterNodes.Items { + instanceID := kubeprovider.GetNodeInstanceID(node) + if common.ContainsEqualFold(inServiceInstances, instanceID) && kubeprovider.IsNodeReady(node) && kubeprovider.IsNodePassesReadinessGates(node, rollingUpgrade.Spec.ReadinessGates) { + readyNodes++ + } + } + if readyNodes != int(desiredInstances) { + return false + } + + return true +}