Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Early cordon #405

Merged
merged 41 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
b03c94c
early-cordon nodes
shreyas-badiger Oct 30, 2023
a304b32
early cordon
shreyas-badiger Nov 3, 2023
6a1b33e
Merge branch 'master' into early-cordon
shreyas-badiger Nov 9, 2023
951d10d
include context in cordon and drain functions
shreyas-badiger Nov 18, 2023
e770892
Merge branch 'early-cordon' of https://github.com/shreyas-badiger/upg…
shreyas-badiger Nov 18, 2023
1f255ee
cordon only drifted instances
shreyas-badiger Nov 18, 2023
ef8e0d0
add unit tests
shreyas-badiger Nov 20, 2023
b94f784
Merge branch 'master' into early-cordon
shreyas-badiger Nov 20, 2023
0f86fad
Update aws-sdk-go-cache to v0.0.2 (#399)
tekenstam Oct 24, 2023
f28163e
Process drain-failure nodes at the end (#394)
ssheladiya Oct 25, 2023
15a65aa
early-cordon nodes
shreyas-badiger Oct 30, 2023
d5415b5
early cordon
shreyas-badiger Nov 3, 2023
3cb3b0b
include context in cordon and drain functions
shreyas-badiger Nov 18, 2023
253b58b
Release v1.0.8 (#400)
shreyas-badiger Nov 1, 2023
6068e4f
cordon only drifted instances
shreyas-badiger Nov 18, 2023
d735d6b
add unit tests
shreyas-badiger Nov 20, 2023
726015a
resolve merge conflicts
shreyas-badiger Nov 20, 2023
7d73a0b
resolve merge conflicts
shreyas-badiger Nov 20, 2023
78ec2d0
update go.sum
shreyas-badiger Nov 20, 2023
44cb155
resolve test errors
shreyas-badiger Nov 20, 2023
6a5e732
remove cordon as upgrade strategy
shreyas-badiger Nov 20, 2023
21b882e
remove space
shreyas-badiger Nov 20, 2023
a72d338
improve test coverage
shreyas-badiger Nov 20, 2023
c64f516
improve code coverage
shreyas-badiger Nov 21, 2023
951afea
remove redundant code
shreyas-badiger Nov 21, 2023
6699ab7
remove unused imports
shreyas-badiger Nov 21, 2023
8a0dbc9
Merge branch 'master' into early-cordon
shreyas-badiger Nov 21, 2023
39680e2
Merge branch 'master' into early-cordon
shreyas-badiger Nov 27, 2023
5fb2c7d
Merge branch 'master' into early-cordon
shreyas-badiger Nov 29, 2023
1070ea6
Update controllers/providers/kubernetes/nodes.go
shreyas-badiger Nov 29, 2023
90e0a26
uncordon the nodes
shreyas-badiger Nov 30, 2023
caf730c
Merge branch 'master' into early-cordon
shreyas-badiger Dec 1, 2023
f7ac7be
error handling for uncordoning
shreyas-badiger Dec 1, 2023
c8d5480
Merge branch 'early-cordon' of https://github.com/shreyas-badiger/upg…
shreyas-badiger Dec 1, 2023
717b481
add tests
shreyas-badiger Dec 1, 2023
5a2a243
handle uncordon scenario properly
shreyas-badiger Dec 4, 2023
2799789
Update controllers/providers/kubernetes/nodes.go
shreyas-badiger Dec 4, 2023
bf895e5
fix typo
shreyas-badiger Dec 4, 2023
84f13e2
Merge branch 'early-cordon' of https://github.com/shreyas-badiger/upg…
shreyas-badiger Dec 4, 2023
ad93882
fix lint errors
shreyas-badiger Dec 4, 2023
5c6287d
default the feature to false
shreyas-badiger Dec 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions controllers/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import (
)

var (
instanceStateTagKey = "upgrademgr.keikoproj.io/state"
inProgressTagValue = "in-progress"
failedDrainTagValue = "failed-drain"
instanceStateTagKey = "upgrademgr.keikoproj.io/state"
inProgressTagValue = "in-progress"
failedDrainTagValue = "failed-drain"
instanceCordonTagKey = "upgrademgr.keikoproj.io/cordon"
)

type DiscoveredState struct {
Expand Down
25 changes: 25 additions & 0 deletions controllers/providers/aws/ec2.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,31 @@ func (a *AmazonClientSet) DescribeTaggedInstanceIDs(tagKey, tagValue string) ([]
return instances, err
}

func (a *AmazonClientSet) DescribeInstancesWithoutTag(tagKey string) ([]string, error) {
instances := []string{}
input := &ec2.DescribeInstancesInput{}
tagIsPresent := false

err := a.Ec2Client.DescribeInstancesPages(input, func(page *ec2.DescribeInstancesOutput, lastPage bool) bool {
for _, res := range page.Reservations {
for _, instance := range res.Instances {
for _, t := range instance.Tags {
if *t.Key == tagKey {
tagIsPresent = true
break
}
}
if !tagIsPresent {
instances = append(instances, aws.StringValue(instance.InstanceId))
}
tagIsPresent = false
}
}
return page.NextToken != nil
})
return instances, err
}

func (a *AmazonClientSet) TagEC2instances(instanceIDs []string, tagKey, tagValue string) error {
input := &ec2.CreateTagsInput{
Resources: aws.StringSlice(instanceIDs),
Expand Down
31 changes: 31 additions & 0 deletions controllers/providers/kubernetes/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (k *KubernetesClientSet) DrainNode(node *corev1.Node, PostDrainDelaySeconds
}

helper := &drain.Helper{
Ctx: context.Background(),
Client: client,
Force: true,
GracePeriodSeconds: -1,
Expand All @@ -75,3 +76,33 @@ func (k *KubernetesClientSet) DrainNode(node *corev1.Node, PostDrainDelaySeconds
}
return nil
}

// CordonNode cordons a node.
shreyas-badiger marked this conversation as resolved.
Show resolved Hide resolved
func (k *KubernetesClientSet) CordonUncordonNode(node *corev1.Node, client kubernetes.Interface, cordonNode bool) error {
if client == nil {
return fmt.Errorf("K8sClient not set")
}

if node == nil {
return fmt.Errorf("node not set")
}

helper := &drain.Helper{
Ctx: context.Background(),
Client: client,
Force: true,
GracePeriodSeconds: -1,
IgnoreAllDaemonSets: true,
Out: os.Stdout,
ErrOut: os.Stdout,
DeleteEmptyDirData: true,
}

if err := drain.RunCordonOrUncordon(helper, node, cordonNode); err != nil {
if apierrors.IsNotFound(err) {
return err
}
return fmt.Errorf("error cordoning node: %v", err)
}
return nil
}
8 changes: 8 additions & 0 deletions controllers/rollingupgrade_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
IgnoreDrainFailures bool
ReplacementNodesMap *sync.Map
MaxReplacementNodes int
EarlyCordonNodes bool
}

// RollingUpgradeAuthenticator has the clients for providers
Expand Down Expand Up @@ -212,13 +213,20 @@
IgnoreDrainFailures: r.IgnoreDrainFailures,
ReplacementNodesMap: r.ReplacementNodesMap,
MaxReplacementNodes: r.MaxReplacementNodes,
EarlyCordonNodes: r.EarlyCordonNodes,

Check warning on line 216 in controllers/rollingupgrade_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/rollingupgrade_controller.go#L216

Added line #L216 was not covered by tests
}

// process node rotation
if err := rollupCtx.RotateNodes(); err != nil {
rollingUpgrade.SetCurrentStatus(v1alpha1.StatusError)
rollingUpgrade.SetLabel(v1alpha1.LabelKeyRollingUpgradeCurrentStatus, v1alpha1.StatusError)
common.SetMetricRollupFailed(rollingUpgrade.Name)

// try to uncordon all the cordoned nodes.
if _, err2 := rollupCtx.CordonUncordonAllNodes(false); err2 != nil {
ZihanJiang96 marked this conversation as resolved.
Show resolved Hide resolved
r.Error(err2, "failed touncordon the nodes.", "name", rollingUpgrade.NamespacedName())
}

Check warning on line 229 in controllers/rollingupgrade_controller.go

View check run for this annotation

Codecov / codecov/patch

controllers/rollingupgrade_controller.go#L228-L229

Added lines #L228 - L229 were not covered by tests
return ctrl.Result{}, err
}

Expand Down
56 changes: 56 additions & 0 deletions controllers/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
ReplacementNodesMap *sync.Map
MaxReplacementNodes int
AllowReplacements bool
EarlyCordonNodes bool
}

func (r *RollingUpgradeContext) RotateNodes() error {
Expand Down Expand Up @@ -147,6 +148,13 @@
inProcessingNodes = make(map[string]*v1alpha1.NodeInProcessing)
}

//Early-Cordon - Cordon all the nodes to avoid any further scheduling of new pods.
if r.EarlyCordonNodes {
if ok, err := r.CordonUncordonAllNodes(true); !ok {
return ok, err
}
}

Check warning on line 156 in controllers/upgrade.go

View check run for this annotation

Codecov / codecov/patch

controllers/upgrade.go#L151-L156

Added lines #L151 - L156 were not covered by tests

switch mode {
case v1alpha1.UpdateStrategyModeEager:
for _, target := range batch {
Expand Down Expand Up @@ -750,3 +758,51 @@
}
return false, batchSize
}

func (r *RollingUpgradeContext) CordonUncordonAllNodes(cordonNode bool) (bool, error) {
scalingGroup := awsprovider.SelectScalingGroup(r.RollingUpgrade.ScalingGroupName(), r.Cloud.ScalingGroups)
var instances []*autoscaling.Instance

if cordonNode {
instanceIDs, err := r.Cloud.AmazonClientSet.DescribeInstancesWithoutTag(instanceCordonTagKey)
if err != nil {
r.Error(err, "failed to describe instances for early-cordoning", "name", r.RollingUpgrade.NamespacedName())
return false, errors.Wrap(err, "failed to describe instances for early-cordoning")
}
for _, instanceID := range instanceIDs {
instance := awsprovider.SelectScalingGroupInstance(instanceID, scalingGroup)
instances = append(instances, instance)
}
} else {
instances = scalingGroup.Instances
}

for _, instance := range instances {
if !reflect.DeepEqual(instance, &autoscaling.Instance{}) {
//Don't consider if the instance is in terminating state.
if !common.ContainsEqualFold(awsprovider.TerminatingInstanceStates, aws.StringValue(instance.LifecycleState)) {
node := kubeprovider.SelectNodeByInstanceID(*instance.InstanceId, r.Cloud.ClusterNodes)
if node == nil {
r.Info("node object not found in clusterNodes, unable to early-cordon node", "instanceID", instance.InstanceId, "name", r.RollingUpgrade.NamespacedName())
continue
}
//Early cordon only the dirfted instances and not the instances that have same scaling-config as the scaling-group
if !r.IsInstanceDrifted(instance) {
break
}
r.Info("early cordoning node", "instanceID", instance.InstanceId, "name", r.RollingUpgrade.NamespacedName())
if err := r.Auth.CordonUncordonNode(node, r.Auth.Kubernetes, true); err != nil {
shreyas-badiger marked this conversation as resolved.
Show resolved Hide resolved
r.Error(err, "failed to early cordon the nodes", "instanceID", instance.InstanceId, "name", r.RollingUpgrade.NamespacedName())
return false, err
}
}
// Add node-cordoned tag
r.Info("tagging instances with cordoned=true", "instanceID", instance.InstanceId, "name", r.RollingUpgrade.NamespacedName())
if err := r.Auth.TagEC2instances([]string{*instance.InstanceId}, instanceCordonTagKey, "True"); err != nil {
r.Error(err, "failed to tag instances with cordoned=true", "instanceID", instance.InstanceId, "name", r.RollingUpgrade.NamespacedName())
return true, err
}
}
}
return true, nil
}
40 changes: 40 additions & 0 deletions controllers/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ func TestRotateNodes(t *testing.T) {
rollupCtx := test.RollingUpgradeContext
rollupCtx.Cloud.ScalingGroups = test.AsgClient.autoScalingGroups
rollupCtx.Auth.AmazonClientSet.AsgClient = test.AsgClient
rollupCtx.EarlyCordonNodes = true

err := rollupCtx.RotateNodes()
if err != nil {
Expand Down Expand Up @@ -507,6 +508,7 @@ func TestIgnoreDrainFailuresAndDrainTimeout(t *testing.T) {
rollupCtx.Cloud.ScalingGroups = test.AsgClient.autoScalingGroups
rollupCtx.Cloud.ClusterNodes = test.ClusterNodes
rollupCtx.Auth.AmazonClientSet.AsgClient = test.AsgClient
rollupCtx.EarlyCordonNodes = true

err := rollupCtx.RotateNodes()
if err != nil {
Expand Down Expand Up @@ -545,6 +547,7 @@ func TestClusterBallooning(t *testing.T) {
reconciler := createRollingUpgradeReconciler(t)
reconciler.MaxReplacementNodes = 500
reconciler.ReplacementNodesMap.Store("ReplacementNodes", 500)
reconciler.EarlyCordonNodes = true
return reconciler
}(),
createRollingUpgrade(),
Expand Down Expand Up @@ -617,3 +620,40 @@ func TestClusterBallooning(t *testing.T) {

}
}

func TestEarlyCordonFunction(t *testing.T) {
var tests = []struct {
TestDescription string
Reconciler *RollingUpgradeReconciler
RollingUpgrade *v1alpha1.RollingUpgrade
AsgClient *MockAutoscalingGroup
ClusterNodes []*corev1.Node
ExpectedUnschdeulableValue bool
}{
{
"Test if all the nodes are cordoned by default.",
createRollingUpgradeReconciler(t),
createRollingUpgrade(),
createASGClient(),
createNodeSlice(),
true,
},
}
for _, test := range tests {
rollupCtx := createRollingUpgradeContext(test.Reconciler)
rollupCtx.RollingUpgrade = test.RollingUpgrade
rollupCtx.Cloud.ScalingGroups = test.AsgClient.autoScalingGroups
rollupCtx.Cloud.ClusterNodes = test.ClusterNodes
rollupCtx.Auth.AmazonClientSet.AsgClient = test.AsgClient

_, err := rollupCtx.CordonUncordonAllNodes(true)
if err != nil {
t.Errorf("Test Description: %s \n error: %v", test.TestDescription, err)
}
for _, node := range rollupCtx.Cloud.ClusterNodes {
if test.ExpectedUnschdeulableValue != node.Spec.Unschedulable {
t.Errorf("Test Description: %s \n expectedValue: %v, actualValue: %v", test.TestDescription, test.ExpectedUnschdeulableValue, node.Spec.Unschedulable)
}
}
}
}
3 changes: 3 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func main() {
drainTimeout int
ignoreDrainFailures bool
maxReplacementNodes int
earlyCordonNodes bool
)

flag.BoolVar(&debugMode, "debug", false, "enable debug logging")
Expand All @@ -102,6 +103,7 @@ func main() {
flag.IntVar(&drainTimeout, "drain-timeout", 900, "when the drain command should timeout")
flag.BoolVar(&ignoreDrainFailures, "ignore-drain-failures", false, "proceed with instance termination despite drain failures.")
flag.IntVar(&maxReplacementNodes, "max-replacement-nodes", 0, "The max number of replacement nodes allowed in a cluster. Avoids cluster-ballooning")
flag.BoolVar(&earlyCordonNodes, "early-cordon-nodes", true, "when enabled, will cordon all the nodes in the node-group even before processing the nodes")

opts := zap.Options{
Development: true,
Expand Down Expand Up @@ -210,6 +212,7 @@ func main() {
DrainTimeout: drainTimeout,
IgnoreDrainFailures: ignoreDrainFailures,
ReplacementNodesMap: &sync.Map{},
EarlyCordonNodes: earlyCordonNodes,
}

reconciler.SetMaxParallel(maxParallel)
Expand Down