Skip to content

Commit

Permalink
Terminate unjoined nodes (#120)
Browse files Browse the repository at this point in the history
* Validation step to check Nodes and ASG launch configs

* Validating launch definition after a rolling upgrade

* Resolve error log message and return statement

* Terminate unjoined nodes

* Resolving PR comments

Co-authored-by: Eytan Avisror <[email protected]>
  • Loading branch information
shreyas-badiger and eytan-avisror authored Sep 17, 2020
1 parent a27fe43 commit 5197dac
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 20 deletions.
35 changes: 20 additions & 15 deletions controllers/rollingupgrade_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ const (
KubeCtlBinary = "/usr/local/bin/kubectl"
// ShellBinary is the path to the shell executable
ShellBinary = "/bin/sh"

// InService is a state of an instance
InService = "InService"
)

var (
Expand Down Expand Up @@ -321,6 +324,9 @@ func (r *RollingUpgradeReconciler) WaitForDesiredNodes(ruObj *upgrademgrv1alpha1
}

func (r *RollingUpgradeReconciler) WaitForTermination(ruObj *upgrademgrv1alpha1.RollingUpgrade, nodeName string, nodeInterface v1.NodeInterface) (bool, error) {
if nodeName == "" {
return true, nil
}

started := time.Now()
for {
Expand Down Expand Up @@ -497,16 +503,14 @@ func (r *RollingUpgradeReconciler) populateNodeList(ruObj *upgrademgrv1alpha1.Ro

// Loads specific environment variables for scripts to use
// on a given rollingUpgrade and autoscaling instance
func loadEnvironmentVariables(ruObj *upgrademgrv1alpha1.RollingUpgrade, nodeInstance *corev1.Node) error {
func loadEnvironmentVariables(ruObj *upgrademgrv1alpha1.RollingUpgrade, instanceID string, nodeName string) error {
if err := os.Setenv(asgNameKey, ruObj.Spec.AsgName); err != nil {
return errors.New(ruObj.Name + ": Could not load " + asgNameKey + ": " + err.Error())
}
tokens := strings.Split(nodeInstance.Spec.ProviderID, "/")
justID := tokens[len(tokens)-1]
if err := os.Setenv(instanceIDKey, justID); err != nil {
if err := os.Setenv(instanceIDKey, instanceID); err != nil {
return errors.New(ruObj.Name + ": Could not load " + instanceIDKey + ": " + err.Error())
}
if err := os.Setenv(instanceNameKey, nodeInstance.GetName()); err != nil {
if err := os.Setenv(instanceNameKey, nodeName); err != nil {
return errors.New(ruObj.Name + ": Could not load " + instanceNameKey + ": " + err.Error())
}
return nil
Expand Down Expand Up @@ -734,6 +738,9 @@ func (r *RollingUpgradeReconciler) validateNodesLaunchDefinition(ruObj *upgradem
ec2instances := asg.Instances
for _, ec2Instance := range ec2instances {
ec2InstanceID, ec2InstanceLaunchConfig, ec2InstanceLaunchTemplate := ec2Instance.InstanceId, ec2Instance.LaunchConfigurationName, ec2Instance.LaunchTemplate
if aws.StringValue(ec2Instance.LifecycleState) == InService {
continue
}
if aws.StringValue(launchConfigASG) != aws.StringValue(ec2InstanceLaunchConfig) {
return fmt.Errorf("launch config mismatch, %s instance config - %s, does not match the asg config", aws.StringValue(ec2InstanceID), aws.StringValue(ec2InstanceLaunchConfig))
} else if launchTemplateASG != nil && ec2InstanceLaunchTemplate != nil {
Expand Down Expand Up @@ -1028,14 +1035,16 @@ func (r *RollingUpgradeReconciler) DrainTerminate(
ch chan error) {

// Drain and wait for draining node.
err := r.DrainNode(ruObj, nodeName, KubeCtlCall, ruObj.Spec.Strategy.DrainTimeout)
if err != nil && !ruObj.Spec.IgnoreDrainFailures {
ch <- err
return
if nodeName != "" {
err := r.DrainNode(ruObj, nodeName, KubeCtlCall, ruObj.Spec.Strategy.DrainTimeout)
if err != nil && !ruObj.Spec.IgnoreDrainFailures {
ch <- err
return
}
}

// Terminate instance.
err = r.TerminateNode(ruObj, targetInstanceID)
err := r.TerminateNode(ruObj, targetInstanceID)
if err != nil {
ch <- err
return
Expand Down Expand Up @@ -1076,13 +1085,9 @@ func (r *RollingUpgradeReconciler) UpdateInstance(ctx *context.Context,
}

nodeName := r.getNodeName(i, r.NodeList, ruObj)
if nodeName == "" {
ch <- nil
return
}

// Load the environment variables for scripts to run
err := loadEnvironmentVariables(ruObj, r.getNodeFromAsg(i, r.NodeList, ruObj))
err := loadEnvironmentVariables(ruObj, targetInstanceID, nodeName)
if err != nil {
ch <- err
return
Expand Down
61 changes: 56 additions & 5 deletions controllers/rollingupgrade_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,13 +608,10 @@ func TestLoadEnvironmentVariables(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"},
Spec: upgrademgrv1alpha1.RollingUpgradeSpec{AsgName: "asg-foo"}}

mockID := "aws:///us-west-2a/fake-id-foo"
mockID := "fake-id-foo"
mockName := "instance-name-foo"
node := corev1.Node{
ObjectMeta: metav1.ObjectMeta{Name: mockName},
Spec: corev1.NodeSpec{ProviderID: mockID}}

err := loadEnvironmentVariables(ruInstance, &node)
err := loadEnvironmentVariables(ruInstance, mockID, mockName)
g.Expect(err).To(gomega.BeNil())

g.Expect(os.Getenv(asgNameKey)).To(gomega.Equal("asg-foo"))
Expand Down Expand Up @@ -2325,6 +2322,47 @@ func TestWaitForTermination(t *testing.T) {
g.Expect(unjoined).To(gomega.BeTrue())
}

func TestWaitForTerminationWhenNodeIsNotFound(t *testing.T) {
g := gomega.NewGomegaWithT(t)

TerminationTimeoutSeconds = 1
TerminationSleepIntervalSeconds = 1

// nodeName is empty when a node is not found.
mockNodeName := ""
mockNode := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: mockNodeName,
},
}
kuberenetesClient := fake.NewSimpleClientset()
nodeInterface := kuberenetesClient.CoreV1().Nodes()

mgr, err := buildManager()
g.Expect(err).NotTo(gomega.HaveOccurred())

rcRollingUpgrade := &RollingUpgradeReconciler{
Client: mgr.GetClient(),
Log: log2.NullLogger{},
generatedClient: kubernetes.NewForConfigOrDie(mgr.GetConfig()),
admissionMap: sync.Map{},
ruObjNameToASG: sync.Map{},
ClusterState: NewClusterState(),
}
_, err = nodeInterface.Create(mockNode)
g.Expect(err).NotTo(gomega.HaveOccurred())
ruObj := &upgrademgrv1alpha1.RollingUpgrade{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "default",
},
}

unjoined, err := rcRollingUpgrade.WaitForTermination(ruObj, mockNodeName, nodeInterface)
g.Expect(unjoined).To(gomega.BeTrue())
g.Expect(err).To(gomega.BeNil())
}

func buildManager() (manager.Manager, error) {
err := upgrademgrv1alpha1.AddToScheme(scheme.Scheme)
if err != nil {
Expand Down Expand Up @@ -2638,6 +2676,19 @@ func TestDrainNodeTerminateTerminatesWhenIgnoreDrainFailuresSet(t *testing.T) {
// done.
}

// nodeName is empty when node isn't part of the cluster. It must skip drain and terminate.
ch = make(chan error, 1)
rcRollingUpgrade.DrainTerminate(ruObj, "", mockNode, KubeCtlBinary, ch)

select {
case err, ok := <-ch:
fmt.Println(err)
g.Expect(ok).To(gomega.BeFalse()) //don't expect errors.
default:

// done.
}

}

func TestUpdateInstancesNotExists(t *testing.T) {
Expand Down

0 comments on commit 5197dac

Please sign in to comment.