From 2395d2dec357c0bad4665588015b04c8fda20b5f Mon Sep 17 00:00:00 2001 From: sbadiger Date: Tue, 17 Aug 2021 12:25:43 -0700 Subject: [PATCH] Add ignoreDrainFailure and DrainTimeout as controller arguements Signed-off-by: sbadiger --- controllers/common/utils.go | 7 ++++++ controllers/rollingupgrade_controller.go | 27 ++++++++++++++---------- controllers/upgrade.go | 13 ++++++++---- main.go | 7 ++++++ 4 files changed, 39 insertions(+), 15 deletions(-) diff --git a/controllers/common/utils.go b/controllers/common/utils.go index 4b67ce3f..06af48d6 100644 --- a/controllers/common/utils.go +++ b/controllers/common/utils.go @@ -29,3 +29,10 @@ func ContainsEqualFold(slice []string, s string) bool { } return false } + +func IntMax(a, b int) int { + if a > b { + return a + } + return b +} diff --git a/controllers/rollingupgrade_controller.go b/controllers/rollingupgrade_controller.go index b57dbd39..e8372010 100644 --- a/controllers/rollingupgrade_controller.go +++ b/controllers/rollingupgrade_controller.go @@ -42,17 +42,19 @@ import ( type RollingUpgradeReconciler struct { client.Client logr.Logger - Scheme *runtime.Scheme - AdmissionMap sync.Map - CacheConfig *cache.Config - EventWriter *kubeprovider.EventWriter - maxParallel int - ScriptRunner ScriptRunner - Auth *RollingUpgradeAuthenticator - DrainGroupMapper *sync.Map - DrainErrorMapper *sync.Map - ClusterNodesMap *sync.Map - ReconcileMap *sync.Map + Scheme *runtime.Scheme + AdmissionMap sync.Map + CacheConfig *cache.Config + EventWriter *kubeprovider.EventWriter + maxParallel int + ScriptRunner ScriptRunner + Auth *RollingUpgradeAuthenticator + DrainGroupMapper *sync.Map + DrainErrorMapper *sync.Map + ClusterNodesMap *sync.Map + ReconcileMap *sync.Map + DrainTimeout int + IgnoreDrainFailures bool } // RollingUpgradeAuthenticator has the clients for providers @@ -166,6 +168,9 @@ func (r *RollingUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Reque c.ClusterNodes = r.getClusterNodes() return c }(), + + DrainTimeout: r.DrainTimeout, + IgnoreDrainFailures: r.IgnoreDrainFailures, } // process node rotation diff --git a/controllers/upgrade.go b/controllers/upgrade.go index ade49b48..04e7d9a2 100644 --- a/controllers/upgrade.go +++ b/controllers/upgrade.go @@ -60,6 +60,9 @@ type RollingUpgradeContext struct { RollingUpgrade *v1alpha1.RollingUpgrade DrainManager *DrainManager metricsMutex *sync.Mutex + + DrainTimeout int + IgnoreDrainFailures bool } func (r *RollingUpgradeContext) RotateNodes() error { @@ -114,7 +117,8 @@ func (r *RollingUpgradeContext) RotateNodes() error { func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance) (bool, error) { var ( - mode = r.RollingUpgrade.StrategyMode() + mode = r.RollingUpgrade.StrategyMode() + drainTimeout = common.IntMax(r.DrainTimeout, r.RollingUpgrade.DrainTimeout()) ) r.Info("rotating batch", "instances", awsprovider.GetInstanceIDs(batch), "name", r.RollingUpgrade.NamespacedName()) @@ -267,10 +271,11 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance) // Turns onto NodeRotationDrain r.NodeStep(inProcessingNodes, nodeSteps, r.RollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationDrain) - if err := r.Auth.DrainNode(node, time.Duration(r.RollingUpgrade.PostDrainDelaySeconds()), r.RollingUpgrade.DrainTimeout(), r.Auth.Kubernetes); err != nil { - if !r.RollingUpgrade.IsIgnoreDrainFailures() { + if err := r.Auth.DrainNode(node, time.Duration(r.RollingUpgrade.PostDrainDelaySeconds()), drainTimeout, r.Auth.Kubernetes); err != nil { + // ignore drain failures if either of spec or controller args have set ignoreDrainFailures to true. + if !r.RollingUpgrade.IsIgnoreDrainFailures() && !r.IgnoreDrainFailures { r.DrainManager.DrainErrors <- errors.Errorf("DrainNode failed: instanceID - %v, %v", instanceID, err.Error()) - //TODO: BREAK AFTER ERRORS? + return } } } diff --git a/main.go b/main.go index 1cd35a51..e899e44b 100644 --- a/main.go +++ b/main.go @@ -84,10 +84,14 @@ func main() { maxParallel int maxAPIRetries int debugMode bool + drainTimeout int + ignoreDrainFailures bool logMode string ) flag.BoolVar(&debugMode, "debug", false, "enable debug logging") + flag.IntVar(&drainTimeout, "drain-timeout", 900, "when the drain command should timeout") + flag.BoolVar(&ignoreDrainFailures, "ignore-drain-failures", false, "proceed with instance termination despite drain failures.") flag.StringVar(&logMode, "log-format", "text", "Log mode: supported values: text, json.") flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.") flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") @@ -201,6 +205,9 @@ func main() { DrainErrorMapper: &sync.Map{}, ClusterNodesMap: &sync.Map{}, ReconcileMap: &sync.Map{}, + + DrainTimeout: drainTimeout, + IgnoreDrainFailures: ignoreDrainFailures, } reconciler.SetMaxParallel(maxParallel)