Skip to content

Commit

Permalink
Early cordon (#405)
Browse files Browse the repository at this point in the history
* early-cordon nodes

Signed-off-by: sbadiger <[email protected]>

* early cordon

Signed-off-by: sbadiger <[email protected]>

* include context in cordon and drain functions

* cordon only drifted instances

* add unit tests

* Update aws-sdk-go-cache to v0.0.2 (#399)

Signed-off-by: Todd Ekenstam <[email protected]>
Signed-off-by: sbadiger <[email protected]>

* Process drain-failure nodes at the end (#394)

* Process drain-failures at the end
Signed-off-by: ssheladiya <[email protected]>
Signed-off-by: sbadiger <[email protected]>

* early-cordon nodes

Signed-off-by: sbadiger <[email protected]>

* early cordon

Signed-off-by: sbadiger <[email protected]>

* include context in cordon and drain functions

Signed-off-by: sbadiger <[email protected]>

* Release v1.0.8 (#400)

Signed-off-by: sbadiger <[email protected]>

* cordon only drifted instances

Signed-off-by: sbadiger <[email protected]>

* add unit tests

Signed-off-by: sbadiger <[email protected]>

* resolve merge conflicts

* update go.sum

* resolve test errors

* remove cordon as upgrade strategy

* remove space

* improve test coverage

* improve code coverage

* remove redundant code

* remove unused imports

* Update controllers/providers/kubernetes/nodes.go

Co-authored-by: Venkata Gunapati <[email protected]>

* uncordon the nodes

* error handling for uncordoning

* add tests

* handle uncordon scenario properly

* Update controllers/providers/kubernetes/nodes.go

Co-authored-by: Venkata Gunapati <[email protected]>

* fix typo

* fix lint errors

* default the feature to false

---------

Signed-off-by: sbadiger <[email protected]>
Signed-off-by: Todd Ekenstam <[email protected]>
Co-authored-by: Todd Ekenstam <[email protected]>
Co-authored-by: Siddharth Sheladiya <[email protected]>
Co-authored-by: Venkata Gunapati <[email protected]>
  • Loading branch information
4 people authored Dec 5, 2023
1 parent 1201813 commit c2fdb37
Show file tree
Hide file tree
Showing 11 changed files with 544 additions and 254 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ all: manager
# Run tests
ENVTEST_ASSETS_DIR=$(shell pwd)/testbin
test: manifests generate fmt vet envtest
go test ./controllers/... ./api/...
go tool cover -html=./coverage.txt -o cover.html

# Build manager binary
Expand Down
7 changes: 4 additions & 3 deletions controllers/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import (
)

var (
instanceStateTagKey = "upgrademgr.keikoproj.io/state"
inProgressTagValue = "in-progress"
failedDrainTagValue = "failed-drain"
instanceStateTagKey = "upgrademgr.keikoproj.io/state"
inProgressTagValue = "in-progress"
failedDrainTagValue = "failed-drain"
earlyCordonedTagValue = "early-cordoned"
)

type DiscoveredState struct {
Expand Down
4 changes: 4 additions & 0 deletions controllers/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,3 +426,7 @@ func (mockAutoscalingGroup MockAutoscalingGroup) EnterStandby(_ *autoscaling.Ent
output := &autoscaling.EnterStandbyOutput{}
return output, nil
}

func (m *MockEC2) DeleteTags(input *ec2.DeleteTagsInput) (*ec2.DeleteTagsOutput, error) {
return &ec2.DeleteTagsOutput{}, nil
}
4 changes: 2 additions & 2 deletions controllers/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
func TestNodeTurnsOntoStep(t *testing.T) {
g := gomega.NewGomegaWithT(t)

reconsiler := createRollingUpgradeReconciler(t)
r := createRollingUpgradeContext(reconsiler)
reconciler := createRollingUpgradeReconciler(t)
r := createRollingUpgradeContext(reconciler)

//A map to retain the steps for multiple nodes
nodeSteps := make(map[string][]v1alpha1.NodeStepDuration)
Expand Down
39 changes: 39 additions & 0 deletions controllers/providers/aws/ec2.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,31 @@ func (a *AmazonClientSet) DescribeTaggedInstanceIDs(tagKey, tagValue string) ([]
return instances, err
}

func (a *AmazonClientSet) DescribeInstancesWithoutTagValue(tagKey string, tagValue string) ([]string, error) {
instances := []string{}
input := &ec2.DescribeInstancesInput{}
tagAndValueIsPresent := false

err := a.Ec2Client.DescribeInstancesPages(input, func(page *ec2.DescribeInstancesOutput, lastPage bool) bool {
for _, res := range page.Reservations {
for _, instance := range res.Instances {
for _, t := range instance.Tags {
if *t.Key == tagKey && *t.Value == tagValue {
tagAndValueIsPresent = true
break
}
}
if !tagAndValueIsPresent {
instances = append(instances, aws.StringValue(instance.InstanceId))
}
tagAndValueIsPresent = false
}
}
return page.NextToken != nil
})
return instances, err
}

func (a *AmazonClientSet) TagEC2instances(instanceIDs []string, tagKey, tagValue string) error {
input := &ec2.CreateTagsInput{
Resources: aws.StringSlice(instanceIDs),
Expand All @@ -75,3 +100,17 @@ func (a *AmazonClientSet) TagEC2instances(instanceIDs []string, tagKey, tagValue
_, err := a.Ec2Client.CreateTags(input)
return err
}

func (a *AmazonClientSet) UntagEC2instances(instanceIDs []string, tagKey, tagValue string) error {
input := &ec2.DeleteTagsInput{
Resources: aws.StringSlice(instanceIDs),
Tags: []*ec2.Tag{
{
Key: aws.String(tagKey),
Value: aws.String(tagValue),
},
},
}
_, err := a.Ec2Client.DeleteTags(input)
return err
}
31 changes: 31 additions & 0 deletions controllers/providers/kubernetes/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (k *KubernetesClientSet) DrainNode(node *corev1.Node, PostDrainDelaySeconds
}

helper := &drain.Helper{
Ctx: context.Background(),
Client: client,
Force: true,
GracePeriodSeconds: -1,
Expand All @@ -75,3 +76,33 @@ func (k *KubernetesClientSet) DrainNode(node *corev1.Node, PostDrainDelaySeconds
}
return nil
}

// CordonUncordonNode cordons a node.
func (k *KubernetesClientSet) CordonUncordonNode(node *corev1.Node, client kubernetes.Interface, cordonNode bool) error {
if client == nil {
return fmt.Errorf("K8sClient not set")
}

if node == nil {
return fmt.Errorf("node not set")
}

helper := &drain.Helper{
Ctx: context.Background(),
Client: client,
Force: true,
GracePeriodSeconds: -1,
IgnoreAllDaemonSets: true,
Out: os.Stdout,
ErrOut: os.Stdout,
DeleteEmptyDirData: true,
}

if err := drain.RunCordonOrUncordon(helper, node, cordonNode); err != nil {
if apierrors.IsNotFound(err) {
return err
}
return fmt.Errorf("error cordoning node: %v", err)
}
return nil
}
8 changes: 8 additions & 0 deletions controllers/rollingupgrade_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type RollingUpgradeReconciler struct {
IgnoreDrainFailures bool
ReplacementNodesMap *sync.Map
MaxReplacementNodes int
EarlyCordonNodes bool
}

// RollingUpgradeAuthenticator has the clients for providers
Expand Down Expand Up @@ -212,13 +213,20 @@ func (r *RollingUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Reque
IgnoreDrainFailures: r.IgnoreDrainFailures,
ReplacementNodesMap: r.ReplacementNodesMap,
MaxReplacementNodes: r.MaxReplacementNodes,
EarlyCordonNodes: r.EarlyCordonNodes,
}

// process node rotation
if err := rollupCtx.RotateNodes(); err != nil {
rollingUpgrade.SetCurrentStatus(v1alpha1.StatusError)
rollingUpgrade.SetLabel(v1alpha1.LabelKeyRollingUpgradeCurrentStatus, v1alpha1.StatusError)
common.SetMetricRollupFailed(rollingUpgrade.Name)

// try to uncordon all the cordoned nodes.
if _, err2 := rollupCtx.CordonUncordonAllNodes(false); err2 != nil {
r.Error(err2, "failed to uncordon the nodes.", "name", rollingUpgrade.NamespacedName())
}

return ctrl.Result{}, err
}

Expand Down
63 changes: 63 additions & 0 deletions controllers/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type RollingUpgradeContext struct {
ReplacementNodesMap *sync.Map
MaxReplacementNodes int
AllowReplacements bool
EarlyCordonNodes bool
}

func (r *RollingUpgradeContext) RotateNodes() error {
Expand Down Expand Up @@ -147,6 +148,13 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance)
inProcessingNodes = make(map[string]*v1alpha1.NodeInProcessing)
}

//Early-Cordon - Cordon all the nodes to avoid any further scheduling of new pods.
if r.EarlyCordonNodes {
if ok, err := r.CordonUncordonAllNodes(true); !ok {
return ok, err
}
}

switch mode {
case v1alpha1.UpdateStrategyModeEager:
for _, target := range batch {
Expand Down Expand Up @@ -750,3 +758,58 @@ func (r *RollingUpgradeContext) ClusterBallooning(batchSize int) (bool, int) {
}
return false, batchSize
}

func (r *RollingUpgradeContext) CordonUncordonAllNodes(cordonNode bool) (bool, error) {
scalingGroup := awsprovider.SelectScalingGroup(r.RollingUpgrade.ScalingGroupName(), r.Cloud.ScalingGroups)
var instanceIDs []string
var err error

if cordonNode {
instanceIDs, err = r.Cloud.AmazonClientSet.DescribeInstancesWithoutTagValue(instanceStateTagKey, earlyCordonedTagValue)
if err != nil {
r.Error(err, "failed to describe instances for early-cordoning", "name", r.RollingUpgrade.NamespacedName())
return false, errors.Wrap(err, "failed to describe instances for early-cordoning")
}
} else {
instanceIDs, err = r.Auth.DescribeTaggedInstanceIDs(instanceStateTagKey, earlyCordonedTagValue)
if err != nil {
r.Error(err, "failed to discover ec2 instances with early-cordoned tag", "name", r.RollingUpgrade.NamespacedName())
}

r.Info("removing early-cordoning tag while uncordoning instances", "name", r.RollingUpgrade.NamespacedName())
if err := r.Auth.UntagEC2instances(instanceIDs, instanceStateTagKey, earlyCordonedTagValue); err != nil {
r.Error(err, "failed to delete early-cordoned tag for instances", "name", r.RollingUpgrade.NamespacedName())
}
// add unit test as well.

}

for _, instanceID := range instanceIDs {
if instance := awsprovider.SelectScalingGroupInstance(instanceID, scalingGroup); !reflect.DeepEqual(instance, &autoscaling.Instance{}) {
//Don't consider if the instance is in terminating state.
if !common.ContainsEqualFold(awsprovider.TerminatingInstanceStates, aws.StringValue(instance.LifecycleState)) {
node := kubeprovider.SelectNodeByInstanceID(*instance.InstanceId, r.Cloud.ClusterNodes)
if node == nil {
r.Info("node object not found in clusterNodes, unable to early-cordon node", "instanceID", instance.InstanceId, "name", r.RollingUpgrade.NamespacedName())
continue
}
//Early cordon only the dirfted instances and not the instances that have same scaling-config as the scaling-group
if !r.IsInstanceDrifted(instance) {
break
}
r.Info("early cordoning node", "instanceID", instance.InstanceId, "name", r.RollingUpgrade.NamespacedName())
if err := r.Auth.CordonUncordonNode(node, r.Auth.Kubernetes, cordonNode); err != nil {
r.Error(err, "failed to early cordon the nodes", "instanceID", instance.InstanceId, "name", r.RollingUpgrade.NamespacedName())
return false, err
}
// Set instance-state to early-cordoned tag
r.Info("tagging instances with cordoned=true", "instanceID", instance.InstanceId, "name", r.RollingUpgrade.NamespacedName())
if err := r.Auth.TagEC2instances([]string{*instance.InstanceId}, instanceStateTagKey, earlyCordonedTagValue); err != nil {
r.Error(err, "failed to tag instances with cordoned=true", "instanceID", instance.InstanceId, "name", r.RollingUpgrade.NamespacedName())
return true, err
}
}
}
}
return true, nil
}
63 changes: 63 additions & 0 deletions controllers/upgrade_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package controllers

import (
"context"
"os"
"testing"

Expand Down Expand Up @@ -105,6 +106,7 @@ func TestRunCordonOrUncordon(t *testing.T) {
for _, test := range tests {
rollupCtx := createRollingUpgradeContext(test.Reconciler)
helper := &drain.Helper{
Ctx: context.Background(),
Client: rollupCtx.Auth.Kubernetes,
Force: true,
GracePeriodSeconds: -1,
Expand Down Expand Up @@ -299,6 +301,7 @@ func TestRotateNodes(t *testing.T) {
rollupCtx := test.RollingUpgradeContext
rollupCtx.Cloud.ScalingGroups = test.AsgClient.autoScalingGroups
rollupCtx.Auth.AmazonClientSet.AsgClient = test.AsgClient
rollupCtx.EarlyCordonNodes = true

err := rollupCtx.RotateNodes()
if err != nil {
Expand Down Expand Up @@ -507,6 +510,7 @@ func TestIgnoreDrainFailuresAndDrainTimeout(t *testing.T) {
rollupCtx.Cloud.ScalingGroups = test.AsgClient.autoScalingGroups
rollupCtx.Cloud.ClusterNodes = test.ClusterNodes
rollupCtx.Auth.AmazonClientSet.AsgClient = test.AsgClient
rollupCtx.EarlyCordonNodes = true

err := rollupCtx.RotateNodes()
if err != nil {
Expand Down Expand Up @@ -545,6 +549,7 @@ func TestClusterBallooning(t *testing.T) {
reconciler := createRollingUpgradeReconciler(t)
reconciler.MaxReplacementNodes = 500
reconciler.ReplacementNodesMap.Store("ReplacementNodes", 500)
reconciler.EarlyCordonNodes = true
return reconciler
}(),
createRollingUpgrade(),
Expand Down Expand Up @@ -617,3 +622,61 @@ func TestClusterBallooning(t *testing.T) {

}
}

func TestCordoningAndUncordoningOfNodes(t *testing.T) {
var tests = []struct {
TestDescription string
Reconciler *RollingUpgradeReconciler
Node *corev1.Node
CordonNodeFlag bool
ExpectedUnschdeulableValue bool
ExpectedError bool
}{
{
"Test if all the nodes are cordoned.",
createRollingUpgradeReconciler(t),
createNode("mock-node-1"),
true,
true,
false,
},
{
"Test if all the nodes are uncordoned",
createRollingUpgradeReconciler(t),
createNode("mock-node-1"),
false,
false,
false,
},
{
"Try to cordon an unknown node.",
createRollingUpgradeReconciler(t),
createNode("mock-node-4"),
true,
true,
true,
},
}
for _, test := range tests {
rollupCtx := createRollingUpgradeContext(test.Reconciler)

if err := rollupCtx.Auth.CordonUncordonNode(test.Node, rollupCtx.Auth.Kubernetes, test.CordonNodeFlag); err != nil && test.ExpectedError {
continue
}

// By default, nodes are uncordoned. Therefore, before testing uncordoning the node, first cordon it.
if !test.CordonNodeFlag {
if err := rollupCtx.Auth.CordonUncordonNode(test.Node, rollupCtx.Auth.Kubernetes, true); err != nil {
t.Errorf("Test Description: %s \n error: %v", test.TestDescription, err)
}
}

if err := rollupCtx.Auth.CordonUncordonNode(test.Node, rollupCtx.Auth.Kubernetes, test.CordonNodeFlag); err != nil {
t.Errorf("Test Description: %s \n error: %v", test.TestDescription, err)
}

if test.ExpectedUnschdeulableValue != test.Node.Spec.Unschedulable {
t.Errorf("Test Description: %s \n expectedValue: %v, actualValue: %v", test.TestDescription, test.ExpectedUnschdeulableValue, test.Node.Spec.Unschedulable)
}
}
}
Loading

0 comments on commit c2fdb37

Please sign in to comment.