diff --git a/controllers/rollingupgrade_controller.go b/controllers/rollingupgrade_controller.go index f2e50700..c5fbcefd 100644 --- a/controllers/rollingupgrade_controller.go +++ b/controllers/rollingupgrade_controller.go @@ -74,6 +74,9 @@ const ( KubeCtlBinary = "/usr/local/bin/kubectl" // ShellBinary is the path to the shell executable ShellBinary = "/bin/sh" + + // InService is a state of an instance + InService = "InService" ) var ( @@ -321,6 +324,9 @@ func (r *RollingUpgradeReconciler) WaitForDesiredNodes(ruObj *upgrademgrv1alpha1 } func (r *RollingUpgradeReconciler) WaitForTermination(ruObj *upgrademgrv1alpha1.RollingUpgrade, nodeName string, nodeInterface v1.NodeInterface) (bool, error) { + if nodeName == "" { + return true, nil + } started := time.Now() for { @@ -497,16 +503,14 @@ func (r *RollingUpgradeReconciler) populateNodeList(ruObj *upgrademgrv1alpha1.Ro // Loads specific environment variables for scripts to use // on a given rollingUpgrade and autoscaling instance -func loadEnvironmentVariables(ruObj *upgrademgrv1alpha1.RollingUpgrade, nodeInstance *corev1.Node) error { +func loadEnvironmentVariables(ruObj *upgrademgrv1alpha1.RollingUpgrade, instanceID string, nodeName string) error { if err := os.Setenv(asgNameKey, ruObj.Spec.AsgName); err != nil { return errors.New(ruObj.Name + ": Could not load " + asgNameKey + ": " + err.Error()) } - tokens := strings.Split(nodeInstance.Spec.ProviderID, "/") - justID := tokens[len(tokens)-1] - if err := os.Setenv(instanceIDKey, justID); err != nil { + if err := os.Setenv(instanceIDKey, instanceID); err != nil { return errors.New(ruObj.Name + ": Could not load " + instanceIDKey + ": " + err.Error()) } - if err := os.Setenv(instanceNameKey, nodeInstance.GetName()); err != nil { + if err := os.Setenv(instanceNameKey, nodeName); err != nil { return errors.New(ruObj.Name + ": Could not load " + instanceNameKey + ": " + err.Error()) } return nil @@ -734,6 +738,9 @@ func (r *RollingUpgradeReconciler) validateNodesLaunchDefinition(ruObj *upgradem ec2instances := asg.Instances for _, ec2Instance := range ec2instances { ec2InstanceID, ec2InstanceLaunchConfig, ec2InstanceLaunchTemplate := ec2Instance.InstanceId, ec2Instance.LaunchConfigurationName, ec2Instance.LaunchTemplate + if aws.StringValue(ec2Instance.LifecycleState) == InService { + continue + } if aws.StringValue(launchConfigASG) != aws.StringValue(ec2InstanceLaunchConfig) { return fmt.Errorf("launch config mismatch, %s instance config - %s, does not match the asg config", aws.StringValue(ec2InstanceID), aws.StringValue(ec2InstanceLaunchConfig)) } else if launchTemplateASG != nil && ec2InstanceLaunchTemplate != nil { @@ -1028,14 +1035,16 @@ func (r *RollingUpgradeReconciler) DrainTerminate( ch chan error) { // Drain and wait for draining node. - err := r.DrainNode(ruObj, nodeName, KubeCtlCall, ruObj.Spec.Strategy.DrainTimeout) - if err != nil && !ruObj.Spec.IgnoreDrainFailures { - ch <- err - return + if nodeName != "" { + err := r.DrainNode(ruObj, nodeName, KubeCtlCall, ruObj.Spec.Strategy.DrainTimeout) + if err != nil && !ruObj.Spec.IgnoreDrainFailures { + ch <- err + return + } } // Terminate instance. - err = r.TerminateNode(ruObj, targetInstanceID) + err := r.TerminateNode(ruObj, targetInstanceID) if err != nil { ch <- err return @@ -1076,13 +1085,9 @@ func (r *RollingUpgradeReconciler) UpdateInstance(ctx *context.Context, } nodeName := r.getNodeName(i, r.NodeList, ruObj) - if nodeName == "" { - ch <- nil - return - } // Load the environment variables for scripts to run - err := loadEnvironmentVariables(ruObj, r.getNodeFromAsg(i, r.NodeList, ruObj)) + err := loadEnvironmentVariables(ruObj, targetInstanceID, nodeName) if err != nil { ch <- err return diff --git a/controllers/rollingupgrade_controller_test.go b/controllers/rollingupgrade_controller_test.go index d1cd4043..db6f6822 100644 --- a/controllers/rollingupgrade_controller_test.go +++ b/controllers/rollingupgrade_controller_test.go @@ -608,13 +608,10 @@ func TestLoadEnvironmentVariables(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Spec: upgrademgrv1alpha1.RollingUpgradeSpec{AsgName: "asg-foo"}} - mockID := "aws:///us-west-2a/fake-id-foo" + mockID := "fake-id-foo" mockName := "instance-name-foo" - node := corev1.Node{ - ObjectMeta: metav1.ObjectMeta{Name: mockName}, - Spec: corev1.NodeSpec{ProviderID: mockID}} - err := loadEnvironmentVariables(ruInstance, &node) + err := loadEnvironmentVariables(ruInstance, mockID, mockName) g.Expect(err).To(gomega.BeNil()) g.Expect(os.Getenv(asgNameKey)).To(gomega.Equal("asg-foo")) @@ -2325,6 +2322,47 @@ func TestWaitForTermination(t *testing.T) { g.Expect(unjoined).To(gomega.BeTrue()) } +func TestWaitForTerminationWhenNodeIsNotFound(t *testing.T) { + g := gomega.NewGomegaWithT(t) + + TerminationTimeoutSeconds = 1 + TerminationSleepIntervalSeconds = 1 + + // nodeName is empty when a node is not found. + mockNodeName := "" + mockNode := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: mockNodeName, + }, + } + kuberenetesClient := fake.NewSimpleClientset() + nodeInterface := kuberenetesClient.CoreV1().Nodes() + + mgr, err := buildManager() + g.Expect(err).NotTo(gomega.HaveOccurred()) + + rcRollingUpgrade := &RollingUpgradeReconciler{ + Client: mgr.GetClient(), + Log: log2.NullLogger{}, + generatedClient: kubernetes.NewForConfigOrDie(mgr.GetConfig()), + admissionMap: sync.Map{}, + ruObjNameToASG: sync.Map{}, + ClusterState: NewClusterState(), + } + _, err = nodeInterface.Create(mockNode) + g.Expect(err).NotTo(gomega.HaveOccurred()) + ruObj := &upgrademgrv1alpha1.RollingUpgrade{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "default", + }, + } + + unjoined, err := rcRollingUpgrade.WaitForTermination(ruObj, mockNodeName, nodeInterface) + g.Expect(unjoined).To(gomega.BeTrue()) + g.Expect(err).To(gomega.BeNil()) +} + func buildManager() (manager.Manager, error) { err := upgrademgrv1alpha1.AddToScheme(scheme.Scheme) if err != nil { @@ -2638,6 +2676,19 @@ func TestDrainNodeTerminateTerminatesWhenIgnoreDrainFailuresSet(t *testing.T) { // done. } + // nodeName is empty when node isn't part of the cluster. It must skip drain and terminate. + ch = make(chan error, 1) + rcRollingUpgrade.DrainTerminate(ruObj, "", mockNode, KubeCtlBinary, ch) + + select { + case err, ok := <-ch: + fmt.Println(err) + g.Expect(ok).To(gomega.BeFalse()) //don't expect errors. + default: + + // done. + } + } func TestUpdateInstancesNotExists(t *testing.T) {