From 0e4de1bdabac84ca4e7da74b6d64b5b9c3661f18 Mon Sep 17 00:00:00 2001 From: sbadiger Date: Thu, 30 Sep 2021 12:02:34 -0700 Subject: [PATCH 1/3] controller flags for ignoreDrainFailures and drainTimeout Signed-off-by: sbadiger --- api/v1alpha1/rollingupgrade_types.go | 14 ++++---- api/v1alpha1/zz_generated.deepcopy.go | 12 ++++++- ...grademgr.keikoproj.io_rollingupgrades.yaml | 2 -- controllers/helpers_test.go | 3 +- controllers/rollingupgrade_controller.go | 27 ++++++++------ controllers/upgrade.go | 36 ++++++++++++++----- controllers/upgrade_test.go | 6 ++-- main.go | 14 +++++--- 8 files changed, 76 insertions(+), 38 deletions(-) diff --git a/api/v1alpha1/rollingupgrade_types.go b/api/v1alpha1/rollingupgrade_types.go index 078b1c3e..de9a7690 100644 --- a/api/v1alpha1/rollingupgrade_types.go +++ b/api/v1alpha1/rollingupgrade_types.go @@ -37,7 +37,7 @@ type RollingUpgradeSpec struct { PostDrain PostDrainSpec `json:"postDrain,omitempty"` PostTerminate PostTerminateSpec `json:"postTerminate,omitempty"` Strategy UpdateStrategy `json:"strategy,omitempty"` - IgnoreDrainFailures bool `json:"ignoreDrainFailures,omitempty"` + IgnoreDrainFailures *bool `json:"ignoreDrainFailures,omitempty"` ForceRefresh bool `json:"forceRefresh,omitempty"` ReadinessGates []NodeReadinessGate `json:"readinessGates,omitempty"` } @@ -216,7 +216,7 @@ type UpdateStrategy struct { Type UpdateStrategyType `json:"type,omitempty"` Mode UpdateStrategyMode `json:"mode,omitempty"` MaxUnavailable intstr.IntOrString `json:"maxUnavailable,omitempty"` - DrainTimeout int `json:"drainTimeout"` + DrainTimeout *int `json:"drainTimeout,omitempty"` } func (c UpdateStrategyMode) String() string { @@ -232,7 +232,7 @@ func (r *RollingUpgrade) ScalingGroupName() string { return r.Spec.AsgName } -func (r *RollingUpgrade) DrainTimeout() int { +func (r *RollingUpgrade) DrainTimeout() *int { return r.Spec.Strategy.DrainTimeout } @@ -331,7 +331,7 @@ func (r *RollingUpgrade) IsForceRefresh() bool { return r.Spec.ForceRefresh } -func (r *RollingUpgrade) IsIgnoreDrainFailures() bool { +func (r *RollingUpgrade) IsIgnoreDrainFailures() *bool { return r.Spec.IgnoreDrainFailures } func (r *RollingUpgrade) StrategyMode() UpdateStrategyMode { @@ -372,9 +372,9 @@ func (r *RollingUpgrade) Validate() (bool, error) { } // validating the DrainTimeout value - if strategy.DrainTimeout == 0 { - r.Spec.Strategy.DrainTimeout = -1 - } else if strategy.DrainTimeout < -1 { + if *strategy.DrainTimeout == 0 { + *r.Spec.Strategy.DrainTimeout = -1 + } else if *strategy.DrainTimeout < -1 { err := fmt.Errorf("%s: Invalid value for startegy DrainTimeout - %d", r.Name, strategy.MaxUnavailable.IntVal) return false, err } diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 62489a6f..3057b24e 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -205,7 +205,12 @@ func (in *RollingUpgradeSpec) DeepCopyInto(out *RollingUpgradeSpec) { out.PreDrain = in.PreDrain out.PostDrain = in.PostDrain out.PostTerminate = in.PostTerminate - out.Strategy = in.Strategy + in.Strategy.DeepCopyInto(&out.Strategy) + if in.IgnoreDrainFailures != nil { + in, out := &in.IgnoreDrainFailures, &out.IgnoreDrainFailures + *out = new(bool) + **out = **in + } if in.ReadinessGates != nil { in, out := &in.ReadinessGates, &out.ReadinessGates *out = make([]NodeReadinessGate, len(*in)) @@ -304,6 +309,11 @@ func (in *RollingUpgradeStatus) DeepCopy() *RollingUpgradeStatus { func (in *UpdateStrategy) DeepCopyInto(out *UpdateStrategy) { *out = *in out.MaxUnavailable = in.MaxUnavailable + if in.DrainTimeout != nil { + in, out := &in.DrainTimeout, &out.DrainTimeout + *out = new(int) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UpdateStrategy. diff --git a/config/crd/bases/upgrademgr.keikoproj.io_rollingupgrades.yaml b/config/crd/bases/upgrademgr.keikoproj.io_rollingupgrades.yaml index 3e1012ed..b7c9ef39 100644 --- a/config/crd/bases/upgrademgr.keikoproj.io_rollingupgrades.yaml +++ b/config/crd/bases/upgrademgr.keikoproj.io_rollingupgrades.yaml @@ -115,8 +115,6 @@ spec: type: string type: type: string - required: - - drainTimeout type: object type: object status: diff --git a/controllers/helpers_test.go b/controllers/helpers_test.go index 8e6d961b..cb3dae4e 100644 --- a/controllers/helpers_test.go +++ b/controllers/helpers_test.go @@ -85,8 +85,7 @@ func createRollingUpgrade() *v1alpha1.RollingUpgrade { AsgName: "mock-asg-1", PostDrainDelaySeconds: 30, Strategy: v1alpha1.UpdateStrategy{ - Type: v1alpha1.RandomUpdateStrategy, - DrainTimeout: 30, + Type: v1alpha1.RandomUpdateStrategy, }, }, } diff --git a/controllers/rollingupgrade_controller.go b/controllers/rollingupgrade_controller.go index b57dbd39..e8372010 100644 --- a/controllers/rollingupgrade_controller.go +++ b/controllers/rollingupgrade_controller.go @@ -42,17 +42,19 @@ import ( type RollingUpgradeReconciler struct { client.Client logr.Logger - Scheme *runtime.Scheme - AdmissionMap sync.Map - CacheConfig *cache.Config - EventWriter *kubeprovider.EventWriter - maxParallel int - ScriptRunner ScriptRunner - Auth *RollingUpgradeAuthenticator - DrainGroupMapper *sync.Map - DrainErrorMapper *sync.Map - ClusterNodesMap *sync.Map - ReconcileMap *sync.Map + Scheme *runtime.Scheme + AdmissionMap sync.Map + CacheConfig *cache.Config + EventWriter *kubeprovider.EventWriter + maxParallel int + ScriptRunner ScriptRunner + Auth *RollingUpgradeAuthenticator + DrainGroupMapper *sync.Map + DrainErrorMapper *sync.Map + ClusterNodesMap *sync.Map + ReconcileMap *sync.Map + DrainTimeout int + IgnoreDrainFailures bool } // RollingUpgradeAuthenticator has the clients for providers @@ -166,6 +168,9 @@ func (r *RollingUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Reque c.ClusterNodes = r.getClusterNodes() return c }(), + + DrainTimeout: r.DrainTimeout, + IgnoreDrainFailures: r.IgnoreDrainFailures, } // process node rotation diff --git a/controllers/upgrade.go b/controllers/upgrade.go index 984738ef..afc3725f 100644 --- a/controllers/upgrade.go +++ b/controllers/upgrade.go @@ -54,12 +54,14 @@ type DrainManager struct { type RollingUpgradeContext struct { logr.Logger - ScriptRunner ScriptRunner - Auth *RollingUpgradeAuthenticator - Cloud *DiscoveredState - RollingUpgrade *v1alpha1.RollingUpgrade - DrainManager *DrainManager - metricsMutex *sync.Mutex + ScriptRunner ScriptRunner + Auth *RollingUpgradeAuthenticator + Cloud *DiscoveredState + RollingUpgrade *v1alpha1.RollingUpgrade + DrainManager *DrainManager + metricsMutex *sync.Mutex + DrainTimeout int + IgnoreDrainFailures bool } func (r *RollingUpgradeContext) RotateNodes() error { @@ -249,6 +251,24 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance) ) r.DrainManager.DrainGroup.Add(1) + // Determine IgnoreDrainFailure and DrainTimeout values. CR spec takes the precedence. + var ( + drainTimeout int + ignoreDrainFailures bool + ) + if r.RollingUpgrade.DrainTimeout() == nil { + drainTimeout = r.DrainTimeout + } else { + drainTimeout = *r.RollingUpgrade.DrainTimeout() + } + + if r.RollingUpgrade.IsIgnoreDrainFailures() == nil { + ignoreDrainFailures = r.IgnoreDrainFailures + } else { + ignoreDrainFailures = *r.RollingUpgrade.IsIgnoreDrainFailures() + } + + // Drain the nodes in parallel go func() { defer r.DrainManager.DrainGroup.Done() @@ -267,9 +287,9 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance) // Turns onto NodeRotationDrain r.NodeStep(inProcessingNodes, nodeSteps, r.RollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationDrain) - if err := r.Auth.DrainNode(node, time.Duration(r.RollingUpgrade.PostDrainDelaySeconds()), r.RollingUpgrade.DrainTimeout(), r.Auth.Kubernetes); err != nil { + if err := r.Auth.DrainNode(node, time.Duration(r.RollingUpgrade.PostDrainDelaySeconds()), drainTimeout, r.Auth.Kubernetes); err != nil { // ignore drain failures if either of spec or controller args have set ignoreDrainFailures to true. - if !r.RollingUpgrade.IsIgnoreDrainFailures() { + if !ignoreDrainFailures { r.DrainManager.DrainErrors <- errors.Errorf("DrainNode failed: instanceID - %v, %v", instanceID, err.Error()) return } diff --git a/controllers/upgrade_test.go b/controllers/upgrade_test.go index d37f35c5..4cd59c60 100644 --- a/controllers/upgrade_test.go +++ b/controllers/upgrade_test.go @@ -45,7 +45,7 @@ func TestDrainNode(t *testing.T) { err := rollupCtx.Auth.DrainNode( test.Node, time.Duration(rollupCtx.RollingUpgrade.PostDrainDelaySeconds()), - rollupCtx.RollingUpgrade.DrainTimeout(), + 900, rollupCtx.Auth.Kubernetes, ) if (test.ExpectError && err == nil) || (!test.ExpectError && err != nil) { @@ -112,7 +112,7 @@ func TestRunCordonOrUncordon(t *testing.T) { Out: os.Stdout, ErrOut: os.Stdout, DeleteEmptyDirData: true, - Timeout: time.Duration(rollupCtx.RollingUpgrade.Spec.Strategy.DrainTimeout) * time.Second, + Timeout: 900, } err := drain.RunCordonOrUncordon(helper, test.Node, test.Cordon) if (test.ExpectError && err == nil) || (!test.ExpectError && err != nil) { @@ -163,7 +163,7 @@ func TestRunDrainNode(t *testing.T) { Out: os.Stdout, ErrOut: os.Stdout, DeleteEmptyDirData: true, - Timeout: time.Duration(rollupCtx.RollingUpgrade.Spec.Strategy.DrainTimeout) * time.Second, + Timeout: 900, } err := drain.RunNodeDrain(helper, test.Node.Name) if (test.ExpectError && err == nil) || (!test.ExpectError && err != nil) { diff --git a/main.go b/main.go index 1cd35a51..0dc2ea95 100644 --- a/main.go +++ b/main.go @@ -85,6 +85,8 @@ func main() { maxAPIRetries int debugMode bool logMode string + drainTimeout int + ignoreDrainFailures bool ) flag.BoolVar(&debugMode, "debug", false, "enable debug logging") @@ -96,6 +98,8 @@ func main() { flag.StringVar(&namespace, "namespace", "", "The namespace in which to watch objects") flag.IntVar(&maxParallel, "max-parallel", 10, "The max number of parallel rolling upgrades") flag.IntVar(&maxAPIRetries, "max-api-retries", 12, "The number of maximum retries for failed/rate limited AWS API calls") + flag.IntVar(&drainTimeout, "drain-timeout", 900, "when the drain command should timeout") + flag.BoolVar(&ignoreDrainFailures, "ignore-drain-failures", false, "proceed with instance termination despite drain failures.") opts := zap.Options{ Development: true, @@ -197,10 +201,12 @@ func main() { ScriptRunner: controllers.ScriptRunner{ Logger: logger, }, - DrainGroupMapper: &sync.Map{}, - DrainErrorMapper: &sync.Map{}, - ClusterNodesMap: &sync.Map{}, - ReconcileMap: &sync.Map{}, + DrainGroupMapper: &sync.Map{}, + DrainErrorMapper: &sync.Map{}, + ClusterNodesMap: &sync.Map{}, + ReconcileMap: &sync.Map{}, + DrainTimeout: drainTimeout, + IgnoreDrainFailures: ignoreDrainFailures, } reconciler.SetMaxParallel(maxParallel) From 4c77ae97c27afc782a7d5a5c8f5ab526726f05a3 Mon Sep 17 00:00:00 2001 From: sbadiger Date: Thu, 30 Sep 2021 12:11:46 -0700 Subject: [PATCH 2/3] add a check for pointer Signed-off-by: sbadiger --- api/v1alpha1/rollingupgrade_types.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/api/v1alpha1/rollingupgrade_types.go b/api/v1alpha1/rollingupgrade_types.go index de9a7690..53e359be 100644 --- a/api/v1alpha1/rollingupgrade_types.go +++ b/api/v1alpha1/rollingupgrade_types.go @@ -372,12 +372,13 @@ func (r *RollingUpgrade) Validate() (bool, error) { } // validating the DrainTimeout value - if *strategy.DrainTimeout == 0 { - *r.Spec.Strategy.DrainTimeout = -1 - } else if *strategy.DrainTimeout < -1 { - err := fmt.Errorf("%s: Invalid value for startegy DrainTimeout - %d", r.Name, strategy.MaxUnavailable.IntVal) - return false, err + if strategy.DrainTimeout != nil { + if *strategy.DrainTimeout == 0 { + *r.Spec.Strategy.DrainTimeout = -1 + } else if *strategy.DrainTimeout < -1 { + err := fmt.Errorf("%s: Invalid value for startegy DrainTimeout - %d", r.Name, strategy.MaxUnavailable.IntVal) + return false, err + } } - return true, nil } From 9e4fa1860ba22a7847c9e0288c564be1be4d0e77 Mon Sep 17 00:00:00 2001 From: sbadiger Date: Fri, 1 Oct 2021 13:48:10 -0700 Subject: [PATCH 3/3] add test cases Signed-off-by: sbadiger --- controllers/upgrade_test.go | 69 +++++++++++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/controllers/upgrade_test.go b/controllers/upgrade_test.go index 4cd59c60..a74665d8 100644 --- a/controllers/upgrade_test.go +++ b/controllers/upgrade_test.go @@ -448,3 +448,72 @@ func TestSetBatchStandBy(t *testing.T) { } } } + +func TestIgnoreDrainFailuresAndDrainTimeout(t *testing.T) { + var tests = []struct { + TestDescription string + Reconciler *RollingUpgradeReconciler + RollingUpgrade *v1alpha1.RollingUpgrade + AsgClient *MockAutoscalingGroup + ClusterNodes []*corev1.Node + ExpectedStatusValue string + }{ + { + "CR spec has IgnoreDrainFailures as nil, so default false should be considered", + createRollingUpgradeReconciler(t), + createRollingUpgrade(), + createASGClient(), + createNodeSlice(), + v1alpha1.StatusComplete, + }, + { + "CR spec has IgnoreDrainFailures as true, so default false should not be considered", + createRollingUpgradeReconciler(t), + func() *v1alpha1.RollingUpgrade { + rollingUpgrade := createRollingUpgrade() + ignoreDrainFailuresValue := true + rollingUpgrade.Spec.IgnoreDrainFailures = &ignoreDrainFailuresValue + return rollingUpgrade + }(), + createASGClient(), + createNodeSlice(), + v1alpha1.StatusComplete, + }, + { + "CR spec has DrainTimeout as nil, so default value of 900 should be considered", + createRollingUpgradeReconciler(t), + createRollingUpgrade(), + createASGClient(), + createNodeSlice(), + v1alpha1.StatusComplete, + }, + { + "CR spec has DrainTimeout as 1800, so default value of 900 should not be considered", + createRollingUpgradeReconciler(t), + func() *v1alpha1.RollingUpgrade { + rollingUpgrade := createRollingUpgrade() + drainTimeoutValue := 1800 + rollingUpgrade.Spec.Strategy.DrainTimeout = &drainTimeoutValue + return rollingUpgrade + }(), + createASGClient(), + createNodeSlice(), + v1alpha1.StatusComplete, + }, + } + for _, test := range tests { + rollupCtx := createRollingUpgradeContext(test.Reconciler) + rollupCtx.RollingUpgrade = test.RollingUpgrade + rollupCtx.Cloud.ScalingGroups = test.AsgClient.autoScalingGroups + rollupCtx.Cloud.ClusterNodes = test.ClusterNodes + rollupCtx.Auth.AmazonClientSet.AsgClient = test.AsgClient + + err := rollupCtx.RotateNodes() + if err != nil { + t.Errorf("Test Description: %s \n error: %v", test.TestDescription, err) + } + if rollupCtx.RollingUpgrade.CurrentStatus() != test.ExpectedStatusValue { + t.Errorf("Test Description: %s \n expected value: %s, actual value: %s", test.TestDescription, test.ExpectedStatusValue, rollupCtx.RollingUpgrade.CurrentStatus()) + } + } +}