From fcd1699c1829e85bc1011ef94cfb034e944fb00c Mon Sep 17 00:00:00 2001 From: Shreyas Badiger <7680410+shreyas-badiger@users.noreply.github.com> Date: Wed, 23 Jun 2021 19:25:09 -0700 Subject: [PATCH 1/8] upgrade-manager-v2: remove function duplicate declaration. (#266) * and CR end time Signed-off-by: sbadiger * expose totalProcessing time and other metrics Signed-off-by: sbadiger * addressing review comments Signed-off-by: sbadiger * remove function duplication Signed-off-by: sbadiger --- controllers/upgrade.go | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/controllers/upgrade.go b/controllers/upgrade.go index a5c12695..5312c6a5 100644 --- a/controllers/upgrade.go +++ b/controllers/upgrade.go @@ -578,18 +578,3 @@ func (r *RollingUpgradeContext) endTimeUpdate() { common.TotalProcessingTime(r.RollingUpgrade.ScalingGroupName(), totalProcessingTime) } } - -func (r *RollingUpgradeContext) endTimeUpdate() { - //set end time - r.RollingUpgrade.SetEndTime(time.Now().Format(time.RFC3339)) - - //set total processing time - startTime, err1 := time.Parse(time.RFC3339, r.RollingUpgrade.StartTime()) - endTime, err2 := time.Parse(time.RFC3339, r.RollingUpgrade.EndTime()) - if err1 != nil || err2 != nil { - r.Info("failed to calculate totalProcessingTime") - } else { - var totalProcessingTime = endTime.Sub(startTime) - r.RollingUpgrade.SetTotalProcessingTime(totalProcessingTime.String()) - } -} From 1403182f9a497a30ad8f66efc09eff8077af96d8 Mon Sep 17 00:00:00 2001 From: Sheldon Shao Date: Thu, 24 Jun 2021 00:18:28 -0700 Subject: [PATCH 2/8] Carry the metrics status in RollingUpgrade CR (#267) * Update metrics status at same time Signed-off-by: xshao * Update metrics status when terminating instance Signed-off-by: xshao * Add terminated step Signed-off-by: xshao * Add terminated step Signed-off-by: xshao * Add terminated step Signed-off-by: xshao Signed-off-by: sbadiger --- api/v1alpha1/rollingupgrade_types.go | 2 ++ controllers/metrics.go | 21 ++++++++++++++++----- controllers/upgrade.go | 22 ++++++++++++++-------- 3 files changed, 32 insertions(+), 13 deletions(-) diff --git a/api/v1alpha1/rollingupgrade_types.go b/api/v1alpha1/rollingupgrade_types.go index 177d9edf..2563964f 100644 --- a/api/v1alpha1/rollingupgrade_types.go +++ b/api/v1alpha1/rollingupgrade_types.go @@ -168,6 +168,7 @@ const ( NodeRotationPostWait RollingUpgradeStep = "post_wait" NodeRotationTerminate RollingUpgradeStep = "terminate" NodeRotationPostTerminate RollingUpgradeStep = "post_terminate" + NodeRotationTerminated RollingUpgradeStep = "terminated" NodeRotationCompleted RollingUpgradeStep = "completed" ) @@ -180,6 +181,7 @@ var NodeRotationStepOrders = map[RollingUpgradeStep]int{ NodeRotationPostWait: 60, NodeRotationTerminate: 70, NodeRotationPostTerminate: 80, + NodeRotationTerminated: 90, NodeRotationCompleted: 1000, } diff --git a/controllers/metrics.go b/controllers/metrics.go index e3986111..337b91d2 100644 --- a/controllers/metrics.go +++ b/controllers/metrics.go @@ -8,6 +8,12 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// Update metrics status UpdateMetricsStatus +func (s *RollingUpgradeContext) UpdateMetricsStatus(batchNodes map[string]*v1alpha1.NodeInProcessing, nodeSteps map[string][]v1alpha1.NodeStepDuration) { + s.UpdateLastBatchNodes(batchNodes) + s.UpdateStatistics(nodeSteps) +} + // Update last batch nodes func (s *RollingUpgradeContext) UpdateLastBatchNodes(batchNodes map[string]*v1alpha1.NodeInProcessing) { s.RollingUpgrade.Status.NodeInProcessing = batchNodes @@ -62,9 +68,8 @@ func (s *RollingUpgradeContext) ToStepDuration(groupName, nodeName string, stepN } } -// Node turns onto step -func (s *RollingUpgradeContext) NodeStep(InProcessingNodes map[string]*v1alpha1.NodeInProcessing, - nodeSteps map[string][]v1alpha1.NodeStepDuration, groupName, nodeName string, stepName v1alpha1.RollingUpgradeStep) { +func (s *RollingUpgradeContext) DoNodeStep(InProcessingNodes map[string]*v1alpha1.NodeInProcessing, + nodeSteps map[string][]v1alpha1.NodeStepDuration, groupName, nodeName string, stepName v1alpha1.RollingUpgradeStep, endTime metav1.Time) { var inProcessingNode *v1alpha1.NodeInProcessing if n, ok := InProcessingNodes[nodeName]; !ok { @@ -79,7 +84,7 @@ func (s *RollingUpgradeContext) NodeStep(InProcessingNodes map[string]*v1alpha1. inProcessingNode = n } - inProcessingNode.StepEndTime = metav1.Now() + inProcessingNode.StepEndTime = endTime var duration = inProcessingNode.StepEndTime.Sub(inProcessingNode.StepStartTime.Time) if stepName == v1alpha1.NodeRotationCompleted { //Add overall and remove the node from in-processing map @@ -96,13 +101,19 @@ func (s *RollingUpgradeContext) NodeStep(InProcessingNodes map[string]*v1alpha1. var newOrder = v1alpha1.NodeRotationStepOrders[stepName] if newOrder > oldOrder { //Make sure the steps running in order stepDuration := s.ToStepDuration(groupName, nodeName, inProcessingNode.StepName, duration) - inProcessingNode.StepStartTime = metav1.Now() + inProcessingNode.StepStartTime = endTime inProcessingNode.StepName = stepName s.addNodeStepDuration(nodeSteps, nodeName, stepDuration) } } } +// Node turns onto step +func (s *RollingUpgradeContext) NodeStep(InProcessingNodes map[string]*v1alpha1.NodeInProcessing, + nodeSteps map[string][]v1alpha1.NodeStepDuration, groupName, nodeName string, stepName v1alpha1.RollingUpgradeStep) { + s.DoNodeStep(InProcessingNodes, nodeSteps, groupName, nodeName, stepName, metav1.Now()) +} + func (s *RollingUpgradeContext) addNodeStepDuration(steps map[string][]v1alpha1.NodeStepDuration, nodeName string, nsd v1alpha1.NodeStepDuration) { s.metricsMutex.Lock() if stepDuration, ok := steps[nodeName]; !ok { diff --git a/controllers/upgrade.go b/controllers/upgrade.go index 5312c6a5..a959702d 100644 --- a/controllers/upgrade.go +++ b/controllers/upgrade.go @@ -156,6 +156,7 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance) r.Info("setting instances to in-progress", "batch", batchInstanceIDs, "instances(InService)", inServiceInstanceIDs, "name", r.RollingUpgrade.NamespacedName()) if err := r.Auth.TagEC2instances(inServiceInstanceIDs, instanceStateTagKey, inProgressTagValue); err != nil { r.Error(err, "failed to set instances to in-progress", "batch", batchInstanceIDs, "instances(InService)", inServiceInstanceIDs, "name", r.RollingUpgrade.NamespacedName()) + r.UpdateMetricsStatus(inProcessingNodes, nodeSteps) return false, err } // Standby @@ -164,6 +165,7 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance) r.Info("failed to set instances to stand-by", "batch", batchInstanceIDs, "instances(InService)", inServiceInstanceIDs, "message", err.Error(), "name", r.RollingUpgrade.NamespacedName()) } // requeue until there are no InService instances in the batch + r.UpdateMetricsStatus(inProcessingNodes, nodeSteps) return true, nil } else { r.Info("no InService instances in the batch", "batch", batchInstanceIDs, "instances(InService)", inServiceInstanceIDs, "name", r.RollingUpgrade.NamespacedName()) @@ -183,6 +185,7 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance) r.Info("waiting for desired nodes", "name", r.RollingUpgrade.NamespacedName()) if !r.DesiredNodesReady() { r.Info("new node is yet to join the cluster", "name", r.RollingUpgrade.NamespacedName()) + r.UpdateMetricsStatus(inProcessingNodes, nodeSteps) return true, nil } r.Info("desired nodes are ready", "name", r.RollingUpgrade.NamespacedName()) @@ -202,6 +205,7 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance) r.Info("setting batch to in-progress", "batch", batchInstanceIDs, "instances(InService)", inServiceInstanceIDs, "name", r.RollingUpgrade.NamespacedName()) if err := r.Auth.TagEC2instances(inServiceInstanceIDs, instanceStateTagKey, inProgressTagValue); err != nil { r.Error(err, "failed to set batch in-progress", "batch", batchInstanceIDs, "instances(InService)", inServiceInstanceIDs, "name", r.RollingUpgrade.NamespacedName()) + r.UpdateMetricsStatus(inProcessingNodes, nodeSteps) return false, err } } @@ -272,8 +276,7 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance) select { case err := <-r.DrainManager.DrainErrors: - r.UpdateStatistics(nodeSteps) - r.UpdateLastBatchNodes(inProcessingNodes) + r.UpdateMetricsStatus(inProcessingNodes, nodeSteps) r.Error(err, "failed to rotate the node", "name", r.RollingUpgrade.NamespacedName()) return false, err @@ -303,6 +306,7 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance) if err := r.Auth.TerminateInstance(target); err != nil { // terminate failures are retryable r.Info("failed to terminate instance", "instance", instanceID, "message", err.Error(), "name", r.RollingUpgrade.NamespacedName()) + r.UpdateMetricsStatus(inProcessingNodes, nodeSteps) return true, nil } r.RollingUpgrade.SetLastNodeTerminationTime(&metav1.Time{Time: time.Now()}) @@ -315,18 +319,20 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance) return false, err } - // Turns onto NodeRotationCompleted - r.NodeStep(inProcessingNodes, nodeSteps, r.RollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationCompleted) + //Calculate the terminating time, + terminatedTime := metav1.Time{ + Time: metav1.Now().Add(time.Duration(r.RollingUpgrade.NodeIntervalSeconds()) * time.Second), + } + r.NodeStep(inProcessingNodes, nodeSteps, r.RollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationTerminated) + r.DoNodeStep(inProcessingNodes, nodeSteps, r.RollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationCompleted, terminatedTime) } - r.UpdateStatistics(nodeSteps) - r.UpdateLastBatchNodes(inProcessingNodes) + r.UpdateMetricsStatus(inProcessingNodes, nodeSteps) case <-time.After(DefaultWaitGroupTimeout): // goroutines timed out - requeue - r.UpdateStatistics(nodeSteps) - r.UpdateLastBatchNodes(inProcessingNodes) + r.UpdateMetricsStatus(inProcessingNodes, nodeSteps) r.Info("instances are still draining", "name", r.RollingUpgrade.NamespacedName()) return true, nil From 37cd508c7efe84f7e193b07395e6fabb88500a7d Mon Sep 17 00:00:00 2001 From: sbadiger Date: Tue, 29 Jun 2021 10:43:28 -0700 Subject: [PATCH 3/8] move cloud discovery after nodeInterval / drainInterval wait Signed-off-by: sbadiger --- controllers/rollingupgrade_controller.go | 7 ------ controllers/upgrade.go | 27 +++++++++++++++--------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/controllers/rollingupgrade_controller.go b/controllers/rollingupgrade_controller.go index e485e10e..48babd4d 100644 --- a/controllers/rollingupgrade_controller.go +++ b/controllers/rollingupgrade_controller.go @@ -146,13 +146,6 @@ func (r *RollingUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Reque RollingUpgrade: rollingUpgrade, metricsMutex: &sync.Mutex{}, } - rollupCtx.Cloud = NewDiscoveredState(rollupCtx.Auth, rollupCtx.Logger) - if err := rollupCtx.Cloud.Discover(); err != nil { - r.Info("failed to discover the cloud", "scalingGroup", scalingGroupName, "name", rollingUpgrade.NamespacedName()) - rollingUpgrade.SetCurrentStatus(v1alpha1.StatusError) - common.SetMetricRollupFailed(rollingUpgrade.Name) - return ctrl.Result{}, err - } // process node rotation if err := rollupCtx.RotateNodes(); err != nil { diff --git a/controllers/upgrade.go b/controllers/upgrade.go index a959702d..4894d0de 100644 --- a/controllers/upgrade.go +++ b/controllers/upgrade.go @@ -59,7 +59,6 @@ type RollingUpgradeContext struct { metricsMutex *sync.Mutex } -// TODO: main node rotation logic func (r *RollingUpgradeContext) RotateNodes() error { var ( lastTerminationTime = r.RollingUpgrade.LastNodeTerminationTime() @@ -67,16 +66,8 @@ func (r *RollingUpgradeContext) RotateNodes() error { lastDrainTime = r.RollingUpgrade.LastNodeDrainTime() drainInterval = r.RollingUpgrade.PostDrainDelaySeconds() ) - r.RollingUpgrade.SetCurrentStatus(v1alpha1.StatusRunning) - common.SetMetricRollupInitOrRunning(r.RollingUpgrade.Name) - - // set status start time - if r.RollingUpgrade.StartTime() == "" { - r.RollingUpgrade.SetStartTime(time.Now().Format(time.RFC3339)) - } if !lastTerminationTime.IsZero() || !lastDrainTime.IsZero() { - // Check if we are still waiting on a termination delay if time.Since(lastTerminationTime.Time).Seconds() < float64(nodeInterval) { r.Info("reconcile requeue due to termination interval wait", "name", r.RollingUpgrade.NamespacedName()) @@ -90,6 +81,22 @@ func (r *RollingUpgradeContext) RotateNodes() error { } } + // set status start time + if r.RollingUpgrade.StartTime() == "" { + r.RollingUpgrade.SetStartTime(time.Now().Format(time.RFC3339)) + } + r.RollingUpgrade.SetCurrentStatus(v1alpha1.StatusRunning) + common.SetMetricRollupInitOrRunning(r.RollingUpgrade.Name) + + // discover the state of AWS and K8s cluster. + r.Cloud = NewDiscoveredState(r.Auth, r.Logger) + if err := r.Cloud.Discover(); err != nil { + r.Info("failed to discover the cloud", "scalingGroup", r.RollingUpgrade.ScalingGroupName(), "name", r.RollingUpgrade.NamespacedName()) + r.RollingUpgrade.SetCurrentStatus(v1alpha1.StatusError) + common.SetMetricRollupFailed(r.RollingUpgrade.Name) + return err + } + var ( scalingGroup = awsprovider.SelectScalingGroup(r.RollingUpgrade.ScalingGroupName(), r.Cloud.ScalingGroups) ) @@ -245,6 +252,7 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance) if err := r.Auth.DrainNode(&node, time.Duration(r.RollingUpgrade.PostDrainDelaySeconds()), r.RollingUpgrade.DrainTimeout(), r.Auth.Kubernetes); err != nil { if !r.RollingUpgrade.IsIgnoreDrainFailures() { r.DrainManager.DrainErrors <- errors.Errorf("DrainNode failed: instanceID - %v, %v", instanceID, err.Error()) + //TODO: BREAK AFTER ERRORS? } } } @@ -326,7 +334,6 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance) r.NodeStep(inProcessingNodes, nodeSteps, r.RollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationTerminated) r.DoNodeStep(inProcessingNodes, nodeSteps, r.RollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationCompleted, terminatedTime) } - r.UpdateMetricsStatus(inProcessingNodes, nodeSteps) case <-time.After(DefaultWaitGroupTimeout): From edacd18bcb0b46ae446e9d251fdc9c0aec6d0b83 Mon Sep 17 00:00:00 2001 From: sbadiger Date: Thu, 1 Jul 2021 13:04:51 -0700 Subject: [PATCH 4/8] Add watch event for cluster nodes instead of API calls Signed-off-by: sbadiger --- config/rbac/role.yaml | 1 + controllers/cloud.go | 8 +- controllers/helpers_test.go | 128 ++++++++++++++++++++-- controllers/providers/kubernetes/utils.go | 15 ++- controllers/rollingupgrade_controller.go | 35 +++++- controllers/upgrade.go | 20 ++-- controllers/upgrade_test.go | 53 +++------ main.go | 1 + 8 files changed, 185 insertions(+), 76 deletions(-) diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 5c4ea57f..b01753b3 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -35,6 +35,7 @@ rules: - get - list - patch + - watch - apiGroups: - "" resources: diff --git a/controllers/cloud.go b/controllers/cloud.go index 2fea2bd7..dc19f72d 100644 --- a/controllers/cloud.go +++ b/controllers/cloud.go @@ -34,7 +34,7 @@ var ( type DiscoveredState struct { *RollingUpgradeAuthenticator logr.Logger - ClusterNodes *corev1.NodeList + ClusterNodes []*corev1.Node LaunchTemplates []*ec2.LaunchTemplate ScalingGroups []*autoscaling.Group InProgressInstances []string @@ -67,11 +67,5 @@ func (d *DiscoveredState) Discover() error { } d.InProgressInstances = inProgressInstances - nodes, err := d.KubernetesClientSet.ListClusterNodes() - if err != nil || nodes == nil || nodes.Size() == 0 { - return errors.Wrap(err, "failed to discover cluster nodes") - } - d.ClusterNodes = nodes - return nil } diff --git a/controllers/helpers_test.go b/controllers/helpers_test.go index 3faeae5b..50fe6cfa 100644 --- a/controllers/helpers_test.go +++ b/controllers/helpers_test.go @@ -1,6 +1,7 @@ package controllers import ( + "strings" "sync" "testing" "time" @@ -125,9 +126,17 @@ func createNodeList() *corev1.NodeList { } } -func createNode() *corev1.Node { +func createNodeSlice() []*corev1.Node { + return []*corev1.Node{ + createNode("mock-node-1"), + createNode("mock-node-2"), + createNode("mock-node-3"), + } +} + +func createNode(name string) *corev1.Node { return &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{Name: "mock-node-1", Namespace: "default"}, + ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: "default"}, Spec: corev1.NodeSpec{ProviderID: "foo-bar/mock-instance-1"}, Status: corev1.NodeStatus{ Conditions: []corev1.NodeCondition{ @@ -140,18 +149,27 @@ func createNode() *corev1.Node { // AWS type MockAutoscalingGroup struct { autoscalingiface.AutoScalingAPI - errorFlag bool - awsErr awserr.Error - errorInstanceId string - autoScalingGroups []*autoscaling.Group + errorFlag bool + awsErr awserr.Error + errorInstanceId string + autoScalingGroups []*autoscaling.Group + Groups map[string]*autoscaling.Group + LaunchConfigurations map[string]*autoscaling.LaunchConfiguration } +type launchTemplateInfo struct { + data *ec2.ResponseLaunchTemplateData + name *string +} type MockEC2 struct { ec2iface.EC2API - awsErr awserr.Error - reservations []*ec2.Reservation + awsErr awserr.Error + reservations []*ec2.Reservation + LaunchTemplates map[string]*launchTemplateInfo } +var _ ec2iface.EC2API = &MockEC2{} + func createASGInstance(instanceID string, launchConfigName string) *autoscaling.Instance { return &autoscaling.Instance{ InstanceId: &instanceID, @@ -188,8 +206,8 @@ func createASGClient() *MockAutoscalingGroup { } } -func createEc2Client() MockEC2 { - return MockEC2{} +func createEc2Client() *MockEC2 { + return &MockEC2{} } func createAmazonClient(t *testing.T) *awsprovider.AmazonClientSet { @@ -199,6 +217,8 @@ func createAmazonClient(t *testing.T) *awsprovider.AmazonClientSet { } } +/******************************* AWS MOCKS *******************************/ + func (mockAutoscalingGroup MockAutoscalingGroup) TerminateInstanceInAutoScalingGroup(input *autoscaling.TerminateInstanceInAutoScalingGroupInput) (*autoscaling.TerminateInstanceInAutoScalingGroupOutput, error) { output := &autoscaling.TerminateInstanceInAutoScalingGroupOutput{} if mockAutoscalingGroup.errorFlag { @@ -213,3 +233,91 @@ func (mockAutoscalingGroup MockAutoscalingGroup) TerminateInstanceInAutoScalingG output.Activity = &asgChange return output, nil } + +// DescribeLaunchTemplatesPages mocks the describing the launch templates +func (m *MockEC2) DescribeLaunchTemplatesPages(request *ec2.DescribeLaunchTemplatesInput, callback func(*ec2.DescribeLaunchTemplatesOutput, bool) bool) error { + page, err := m.DescribeLaunchTemplates(request) + if err != nil { + return err + } + + callback(page, false) + + return nil +} + +// DescribeLaunchTemplates mocks the describing the launch templates +func (m *MockEC2) DescribeLaunchTemplates(request *ec2.DescribeLaunchTemplatesInput) (*ec2.DescribeLaunchTemplatesOutput, error) { + + o := &ec2.DescribeLaunchTemplatesOutput{} + + if m.LaunchTemplates == nil { + return o, nil + } + + for id, ltInfo := range m.LaunchTemplates { + launchTemplatetName := aws.StringValue(ltInfo.name) + + allFiltersMatch := true + for _, filter := range request.Filters { + filterName := aws.StringValue(filter.Name) + filterValue := aws.StringValue(filter.Values[0]) + + filterMatches := false + if filterName == "tag:Name" && filterValue == launchTemplatetName { + filterMatches = true + } + if strings.HasPrefix(filterName, "tag:kubernetes.io/cluster/") { + filterMatches = true + } + + if !filterMatches { + allFiltersMatch = false + break + } + } + + if allFiltersMatch { + o.LaunchTemplates = append(o.LaunchTemplates, &ec2.LaunchTemplate{ + LaunchTemplateName: aws.String(launchTemplatetName), + LaunchTemplateId: aws.String(id), + }) + } + } + + return o, nil +} + +func (m *MockAutoscalingGroup) DescribeAutoScalingGroupsPages(request *autoscaling.DescribeAutoScalingGroupsInput, callback func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) error { + // For the mock, we just send everything in one page + page, err := m.DescribeAutoScalingGroups(request) + if err != nil { + return err + } + + callback(page, false) + + return nil +} + +func (m *MockAutoscalingGroup) DescribeAutoScalingGroups(input *autoscaling.DescribeAutoScalingGroupsInput) (*autoscaling.DescribeAutoScalingGroupsOutput, error) { + return &autoscaling.DescribeAutoScalingGroupsOutput{ + AutoScalingGroups: createASGs(), + }, nil +} + +func (m *MockEC2) DescribeInstancesPages(request *ec2.DescribeInstancesInput, callback func(*ec2.DescribeInstancesOutput, bool) bool) error { + // For the mock, we just send everything in one page + page, err := m.DescribeInstances(request) + if err != nil { + return err + } + + callback(page, false) + + return nil +} + +func (m *MockEC2) DescribeInstances(*ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error) { + return &ec2.DescribeInstancesOutput{}, nil +} diff --git a/controllers/providers/kubernetes/utils.go b/controllers/providers/kubernetes/utils.go index 997d2709..2e557416 100644 --- a/controllers/providers/kubernetes/utils.go +++ b/controllers/providers/kubernetes/utils.go @@ -20,7 +20,6 @@ import ( "fmt" "os" "os/user" - "reflect" "strings" corev1 "k8s.io/api/core/v1" @@ -87,20 +86,20 @@ func GetKubernetesLocalConfig() (*rest.Config, error) { return config, nil } -func SelectNodeByInstanceID(instanceID string, nodes *corev1.NodeList) corev1.Node { +func SelectNodeByInstanceID(instanceID string, nodes []*corev1.Node) *corev1.Node { if nodes != nil { - for _, node := range nodes.Items { + for _, node := range nodes { nodeID := GetNodeInstanceID(node) if strings.EqualFold(instanceID, nodeID) { return node } } } - return corev1.Node{} + return nil } -func GetNodeInstanceID(node corev1.Node) string { - if !reflect.DeepEqual(node, &corev1.Node{}) { +func GetNodeInstanceID(node *corev1.Node) string { + if node != nil { tokens := strings.Split(node.Spec.ProviderID, "/") nodeInstanceID := tokens[len(tokens)-1] return nodeInstanceID @@ -108,7 +107,7 @@ func GetNodeInstanceID(node corev1.Node) string { return "" } -func IsNodeReady(node corev1.Node) bool { +func IsNodeReady(node *corev1.Node) bool { for _, condition := range node.Status.Conditions { if condition.Type == corev1.NodeReady && condition.Status == corev1.ConditionTrue { return true @@ -117,7 +116,7 @@ func IsNodeReady(node corev1.Node) bool { return false } -func IsNodePassesReadinessGates(node corev1.Node, requiredReadinessGates []v1alpha1.NodeReadinessGate) bool { +func IsNodePassesReadinessGates(node *corev1.Node, requiredReadinessGates []v1alpha1.NodeReadinessGate) bool { if len(requiredReadinessGates) == 0 { return true } diff --git a/controllers/rollingupgrade_controller.go b/controllers/rollingupgrade_controller.go index 48babd4d..0e3db1a5 100644 --- a/controllers/rollingupgrade_controller.go +++ b/controllers/rollingupgrade_controller.go @@ -26,12 +26,15 @@ import ( "github.com/keikoproj/upgrade-manager/controllers/common" awsprovider "github.com/keikoproj/upgrade-manager/controllers/providers/aws" kubeprovider "github.com/keikoproj/upgrade-manager/controllers/providers/kubernetes" + corev1 "k8s.io/api/core/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" ) // RollingUpgradeReconciler reconciles a RollingUpgrade object @@ -47,6 +50,7 @@ type RollingUpgradeReconciler struct { Auth *RollingUpgradeAuthenticator DrainGroupMapper *sync.Map DrainErrorMapper *sync.Map + ClusterNodesMap *sync.Map } type RollingUpgradeAuthenticator struct { @@ -56,7 +60,7 @@ type RollingUpgradeAuthenticator struct { // +kubebuilder:rbac:groups=upgrademgr.keikoproj.io,resources=rollingupgrades,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=upgrademgr.keikoproj.io,resources=rollingupgrades/status,verbs=get;update;patch -// +kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;patch +// +kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;patch;watch // +kubebuilder:rbac:groups=core,resources=pods,verbs=list // +kubebuilder:rbac:groups=core,resources=events,verbs=create // +kubebuilder:rbac:groups=core,resources=pods/eviction,verbs=create @@ -145,6 +149,13 @@ func (r *RollingUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Reque }, RollingUpgrade: rollingUpgrade, metricsMutex: &sync.Mutex{}, + + // discover the K8s cluster at controller level through watch + Cloud: func() *DiscoveredState { + var c = NewDiscoveredState(r.Auth, r.Logger) + c.ClusterNodes = r.getClusterNodes() + return c + }(), } // process node rotation @@ -161,6 +172,7 @@ func (r *RollingUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Reque func (r *RollingUpgradeReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&v1alpha1.RollingUpgrade{}). + Watches(&source.Kind{Type: &corev1.Node{}}, handler.EnqueueRequestsFromMapFunc(r.nodeReconciler)). WithOptions(controller.Options{MaxConcurrentReconciles: r.maxParallel}). Complete(r) } @@ -177,3 +189,24 @@ func (r *RollingUpgradeReconciler) UpdateStatus(rollingUpgrade *v1alpha1.Rolling r.Info("failed to update status", "message", err.Error(), "name", rollingUpgrade.NamespacedName()) } } + +func (r *RollingUpgradeReconciler) nodeReconciler(obj client.Object) []ctrl.Request { + var nodeName = obj.GetName() + r.Info("nodeReconciler", "nodeName", nodeName) + r.ClusterNodesMap.Store(nodeName, obj.(*corev1.Node)) + return nil +} + +func (r *RollingUpgradeReconciler) getClusterNodes() []*corev1.Node { + var clusterNodes []*corev1.Node + + m := map[string]interface{}{} + r.ClusterNodesMap.Range(func(key, value interface{}) bool { + m[fmt.Sprint(key)] = value + return true + }) + for _, value := range m { + clusterNodes = append(clusterNodes, value.(*corev1.Node)) + } + return clusterNodes +} diff --git a/controllers/upgrade.go b/controllers/upgrade.go index 4894d0de..d7a92bc3 100644 --- a/controllers/upgrade.go +++ b/controllers/upgrade.go @@ -60,6 +60,14 @@ type RollingUpgradeContext struct { } func (r *RollingUpgradeContext) RotateNodes() error { + + // set status start time + if r.RollingUpgrade.StartTime() == "" { + r.RollingUpgrade.SetStartTime(time.Now().Format(time.RFC3339)) + } + r.RollingUpgrade.SetCurrentStatus(v1alpha1.StatusRunning) + common.SetMetricRollupInitOrRunning(r.RollingUpgrade.Name) + var ( lastTerminationTime = r.RollingUpgrade.LastNodeTerminationTime() nodeInterval = r.RollingUpgrade.NodeIntervalSeconds() @@ -81,15 +89,7 @@ func (r *RollingUpgradeContext) RotateNodes() error { } } - // set status start time - if r.RollingUpgrade.StartTime() == "" { - r.RollingUpgrade.SetStartTime(time.Now().Format(time.RFC3339)) - } - r.RollingUpgrade.SetCurrentStatus(v1alpha1.StatusRunning) - common.SetMetricRollupInitOrRunning(r.RollingUpgrade.Name) - // discover the state of AWS and K8s cluster. - r.Cloud = NewDiscoveredState(r.Auth, r.Logger) if err := r.Cloud.Discover(); err != nil { r.Info("failed to discover the cloud", "scalingGroup", r.RollingUpgrade.ScalingGroupName(), "name", r.RollingUpgrade.NamespacedName()) r.RollingUpgrade.SetCurrentStatus(v1alpha1.StatusError) @@ -249,7 +249,7 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance) // Turns onto NodeRotationDrain r.NodeStep(inProcessingNodes, nodeSteps, r.RollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationDrain) - if err := r.Auth.DrainNode(&node, time.Duration(r.RollingUpgrade.PostDrainDelaySeconds()), r.RollingUpgrade.DrainTimeout(), r.Auth.Kubernetes); err != nil { + if err := r.Auth.DrainNode(node, time.Duration(r.RollingUpgrade.PostDrainDelaySeconds()), r.RollingUpgrade.DrainTimeout(), r.Auth.Kubernetes); err != nil { if !r.RollingUpgrade.IsIgnoreDrainFailures() { r.DrainManager.DrainErrors <- errors.Errorf("DrainNode failed: instanceID - %v, %v", instanceID, err.Error()) //TODO: BREAK AFTER ERRORS? @@ -522,7 +522,7 @@ func (r *RollingUpgradeContext) DesiredNodesReady() bool { // wait for desired nodes if r.Cloud.ClusterNodes != nil && !reflect.DeepEqual(r.Cloud.ClusterNodes, &corev1.NodeList{}) { - for _, node := range r.Cloud.ClusterNodes.Items { + for _, node := range r.Cloud.ClusterNodes { instanceID := kubeprovider.GetNodeInstanceID(node) if common.ContainsEqualFold(inServiceInstanceIDs, instanceID) && kubeprovider.IsNodeReady(node) && kubeprovider.IsNodePassesReadinessGates(node, r.RollingUpgrade.Spec.ReadinessGates) { readyNodes++ diff --git a/controllers/upgrade_test.go b/controllers/upgrade_test.go index ee96189b..794ff68d 100644 --- a/controllers/upgrade_test.go +++ b/controllers/upgrade_test.go @@ -6,7 +6,6 @@ import ( drain "k8s.io/kubectl/pkg/drain" - "reflect" "time" corev1 "k8s.io/api/core/v1" @@ -17,32 +16,6 @@ import ( "github.com/keikoproj/upgrade-manager/api/v1alpha1" ) -func TestListClusterNodes(t *testing.T) { - var tests = []struct { - TestDescription string - Reconciler *RollingUpgradeReconciler - Node *corev1.Node - ExpectError bool - }{ - { - "List cluster should succeed", - createRollingUpgradeReconciler(t), - createNode(), - false, - }, - } - - for _, test := range tests { - rollupCtx := createRollingUpgradeContext(test.Reconciler) - - actual, err := rollupCtx.Auth.ListClusterNodes() - expected := createNodeList() - if err != nil || !reflect.DeepEqual(actual, expected) { - t.Errorf("ListClusterNodes fail %v", err) - } - } -} - // This test checks implementation of our DrainNode which does both cordon + drain func TestDrainNode(t *testing.T) { var tests = []struct { @@ -54,7 +27,7 @@ func TestDrainNode(t *testing.T) { { "Drain should succeed as node is registered with fakeClient", createRollingUpgradeReconciler(t), - createNode(), + createNode("mock-node-1"), false, }, { @@ -92,7 +65,7 @@ func TestRunCordonOrUncordon(t *testing.T) { { "Cordon should succeed as node is registered with fakeClient", createRollingUpgradeReconciler(t), - createNode(), + createNode("mock-node-1"), true, false, }, @@ -107,7 +80,7 @@ func TestRunCordonOrUncordon(t *testing.T) { "Uncordon should succeed as node is registered with fakeClient", createRollingUpgradeReconciler(t), func() *corev1.Node { - node := createNode() + node := createNode("mock-node-1") node.Spec.Unschedulable = true return node }(), @@ -166,7 +139,7 @@ func TestRunDrainNode(t *testing.T) { { "Drain should succeed as node is registered with fakeClient", createRollingUpgradeReconciler(t), - createNode(), + createNode("mock-node-1"), false, }, // This test should fail, create an upstream ticket. @@ -319,14 +292,14 @@ func TestDesiredNodesReady(t *testing.T) { TestDescription string Reconciler *RollingUpgradeReconciler AsgClient *MockAutoscalingGroup - ClusterNodes *corev1.NodeList + ClusterNodes []*corev1.Node ExpectedValue bool }{ { "Desired nodes are ready", createRollingUpgradeReconciler(t), createASGClient(), - createNodeList(), + createNodeSlice(), true, }, { @@ -337,23 +310,23 @@ func TestDesiredNodesReady(t *testing.T) { newAsgClient.autoScalingGroups[0].DesiredCapacity = func(x int) *int64 { i := int64(x); return &i }(4) return newAsgClient }(), - createNodeList(), + createNodeSlice(), false, }, { "None of the nodes are ready (desiredCount != readyCount)", createRollingUpgradeReconciler(t), createASGClient(), - func() *corev1.NodeList { - var nodeList = &corev1.NodeList{Items: []corev1.Node{}} + func() []*corev1.Node { + var nodeSlice []*corev1.Node for i := 0; i < 3; i++ { - node := createNode() + node := createNode("mock-node-1") node.Status.Conditions = []corev1.NodeCondition{ {Type: corev1.NodeReady, Status: corev1.ConditionFalse}, } - nodeList.Items = append(nodeList.Items, *node) + nodeSlice = append(nodeSlice, node) } - return nodeList + return nodeSlice }(), false, }, @@ -369,7 +342,7 @@ func TestDesiredNodesReady(t *testing.T) { } return newAsgClient }(), - createNodeList(), + createNodeSlice(), false, }, } diff --git a/main.go b/main.go index f6efe702..88a0e58f 100644 --- a/main.go +++ b/main.go @@ -199,6 +199,7 @@ func main() { }, DrainGroupMapper: &sync.Map{}, DrainErrorMapper: &sync.Map{}, + ClusterNodesMap: &sync.Map{}, } reconciler.SetMaxParallel(maxParallel) From 0598fe43989ef5467216d56860b4f2eb8c71c72d Mon Sep 17 00:00:00 2001 From: sbadiger Date: Thu, 1 Jul 2021 19:30:36 -0700 Subject: [PATCH 5/8] upon node deletion, remove it from syncMap as well Signed-off-by: sbadiger --- controllers/rollingupgrade_controller.go | 22 ++++++++-- controllers/upgrade.go | 55 +++++++++++++++++------- 2 files changed, 59 insertions(+), 18 deletions(-) diff --git a/controllers/rollingupgrade_controller.go b/controllers/rollingupgrade_controller.go index 0e3db1a5..5f1fa933 100644 --- a/controllers/rollingupgrade_controller.go +++ b/controllers/rollingupgrade_controller.go @@ -24,11 +24,13 @@ import ( "github.com/keikoproj/aws-sdk-go-cache/cache" "github.com/keikoproj/upgrade-manager/api/v1alpha1" "github.com/keikoproj/upgrade-manager/controllers/common" + "github.com/keikoproj/upgrade-manager/controllers/common/log" awsprovider "github.com/keikoproj/upgrade-manager/controllers/providers/aws" kubeprovider "github.com/keikoproj/upgrade-manager/controllers/providers/kubernetes" corev1 "k8s.io/api/core/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -191,9 +193,23 @@ func (r *RollingUpgradeReconciler) UpdateStatus(rollingUpgrade *v1alpha1.Rolling } func (r *RollingUpgradeReconciler) nodeReconciler(obj client.Object) []ctrl.Request { - var nodeName = obj.GetName() - r.Info("nodeReconciler", "nodeName", nodeName) - r.ClusterNodesMap.Store(nodeName, obj.(*corev1.Node)) + var ( + nodeName = obj.GetName() + nodeObj = obj.(*corev1.Node) + ) + + // for a deleted node, delete it from sync Map as well. + var ctx context.Context + err := r.Get(ctx, types.NamespacedName{Name: nodeName}, nodeObj) + if err != nil { + if kerrors.IsNotFound(err) { + r.ClusterNodesMap.Delete(nodeName) + log.Debug("nodeReconciler[delete] - nodeObj not found, deleted from sync map", "name", nodeName) + } + } else { + log.Debug("nodeReconciler[store]", "nodeName", nodeName) + r.ClusterNodesMap.Store(nodeName, obj.(*corev1.Node)) + } return nil } diff --git a/controllers/upgrade.go b/controllers/upgrade.go index d7a92bc3..ea09f825 100644 --- a/controllers/upgrade.go +++ b/controllers/upgrade.go @@ -147,10 +147,15 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance) switch mode { case v1alpha1.UpdateStrategyModeEager: for _, target := range batch { + instanceID := aws.StringValue(target.InstanceId) + node := kubeprovider.SelectNodeByInstanceID(instanceID, r.Cloud.ClusterNodes) + if node == nil { + r.Info("node object not found in clusterNodes, skipping this node for now", "instanceID", instanceID, "name", r.RollingUpgrade.NamespacedName()) + continue + } + var ( - instanceID = aws.StringValue(target.InstanceId) - node = kubeprovider.SelectNodeByInstanceID(instanceID, r.Cloud.ClusterNodes) - nodeName = node.GetName() + nodeName = node.GetName() ) //Add statistics r.NodeStep(inProcessingNodes, nodeSteps, r.RollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationKickoff) @@ -180,10 +185,14 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance) // turns onto desired nodes for _, target := range batch { + instanceID := aws.StringValue(target.InstanceId) + node := kubeprovider.SelectNodeByInstanceID(instanceID, r.Cloud.ClusterNodes) + if node == nil { + r.Info("node object not found in clusterNodes, skipping this node for now", "instanceID", instanceID, "name", r.RollingUpgrade.NamespacedName()) + continue + } var ( - instanceID = aws.StringValue(target.InstanceId) - node = kubeprovider.SelectNodeByInstanceID(instanceID, r.Cloud.ClusterNodes) - nodeName = node.GetName() + nodeName = node.GetName() ) r.NodeStep(inProcessingNodes, nodeSteps, r.RollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationDesiredNodeReady) } @@ -199,10 +208,14 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance) case v1alpha1.UpdateStrategyModeLazy: for _, target := range batch { + instanceID := aws.StringValue(target.InstanceId) + node := kubeprovider.SelectNodeByInstanceID(instanceID, r.Cloud.ClusterNodes) + if node == nil { + r.Info("node object not found in clusterNodes, skipping this node for now", "instanceID", instanceID, "name", r.RollingUpgrade.NamespacedName()) + continue + } var ( - instanceID = aws.StringValue(target.InstanceId) - node = kubeprovider.SelectNodeByInstanceID(instanceID, r.Cloud.ClusterNodes) - nodeName = node.GetName() + nodeName = node.GetName() ) // add statistics r.NodeStep(inProcessingNodes, nodeSteps, r.RollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationKickoff) @@ -219,9 +232,13 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance) if reflect.DeepEqual(r.DrainManager.DrainGroup, &sync.WaitGroup{}) { for _, target := range batch { + instanceID := aws.StringValue(target.InstanceId) + node := kubeprovider.SelectNodeByInstanceID(instanceID, r.Cloud.ClusterNodes) + if node == nil { + r.Info("node object not found in clusterNodes, skipping this node for now", "instanceID", instanceID, "name", r.RollingUpgrade.NamespacedName()) + continue + } var ( - instanceID = aws.StringValue(target.InstanceId) - node = kubeprovider.SelectNodeByInstanceID(instanceID, r.Cloud.ClusterNodes) nodeName = node.GetName() scriptTarget = ScriptTarget{ InstanceID: instanceID, @@ -243,7 +260,7 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance) } // Issue drain concurrently - set lastDrainTime - if node := kubeprovider.SelectNodeByInstanceID(instanceID, r.Cloud.ClusterNodes); !reflect.DeepEqual(node, corev1.Node{}) { + if node := kubeprovider.SelectNodeByInstanceID(instanceID, r.Cloud.ClusterNodes); node != nil { r.Info("draining the node", "instance", instanceID, "node name", node.Name, "name", r.RollingUpgrade.NamespacedName()) // Turns onto NodeRotationDrain @@ -294,9 +311,13 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance) r.RollingUpgrade.SetLastNodeDrainTime(&metav1.Time{Time: time.Now()}) r.Info("instances drained successfully, terminating", "name", r.RollingUpgrade.NamespacedName()) for _, target := range batch { + instanceID := aws.StringValue(target.InstanceId) + node := kubeprovider.SelectNodeByInstanceID(instanceID, r.Cloud.ClusterNodes) + if node == nil { + r.Info("node object not found in clusterNodes, skipping this node for now", "instanceID", instanceID, "name", r.RollingUpgrade.NamespacedName()) + continue + } var ( - instanceID = aws.StringValue(target.InstanceId) - node = kubeprovider.SelectNodeByInstanceID(instanceID, r.Cloud.ClusterNodes) nodeName = node.GetName() scriptTarget = ScriptTarget{ InstanceID: instanceID, @@ -423,8 +444,12 @@ func (r *RollingUpgradeContext) IsInstanceDrifted(instance *autoscaling.Instance // check if there is atleast one node that meets the force-referesh criteria if r.RollingUpgrade.IsForceRefresh() { + node := kubeprovider.SelectNodeByInstanceID(instanceID, r.Cloud.ClusterNodes) + if node == nil { + r.Info("node object not found in clusterNodes, skipping this node for now", "instanceID", instanceID, "name", r.RollingUpgrade.NamespacedName()) + return false + } var ( - node = kubeprovider.SelectNodeByInstanceID(instanceID, r.Cloud.ClusterNodes) nodeCreationTime = node.CreationTimestamp.Time upgradeCreationTime = r.RollingUpgrade.CreationTimestamp.Time ) From 3a844040c29ea65b33738a882d663b8d95a9dbff Mon Sep 17 00:00:00 2001 From: sbadiger Date: Fri, 2 Jul 2021 10:55:26 -0700 Subject: [PATCH 6/8] Add nodeEvents handler instead of watch handler Signed-off-by: sbadiger --- controllers/rollingupgrade_controller.go | 60 +++++++++++++++--------- 1 file changed, 38 insertions(+), 22 deletions(-) diff --git a/controllers/rollingupgrade_controller.go b/controllers/rollingupgrade_controller.go index 5f1fa933..b7f663e3 100644 --- a/controllers/rollingupgrade_controller.go +++ b/controllers/rollingupgrade_controller.go @@ -30,11 +30,12 @@ import ( corev1 "k8s.io/api/core/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" ) @@ -175,10 +176,46 @@ func (r *RollingUpgradeReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&v1alpha1.RollingUpgrade{}). Watches(&source.Kind{Type: &corev1.Node{}}, handler.EnqueueRequestsFromMapFunc(r.nodeReconciler)). + WithEventFilter(r.nodeEventsHandler()). WithOptions(controller.Options{MaxConcurrentReconciles: r.maxParallel}). Complete(r) } +func (r *RollingUpgradeReconciler) nodeEventsHandler() predicate.Predicate { + return predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { + nodeObj, ok := e.Object.(*corev1.Node) + if ok { + nodeName := e.Object.GetName() + log.Debug("nodeEventsHandler[create] nodeObj created, stored in sync map", "nodeName", nodeName) + r.ClusterNodesMap.Store(nodeName, nodeObj) + } + return true + }, + UpdateFunc: func(e event.UpdateEvent) bool { + nodeObj, ok := e.ObjectNew.(*corev1.Node) + if ok { + nodeName := e.ObjectNew.GetName() + log.Debug("nodeEventsHandler[update] nodeObj updated, updated in sync map", "nodeName", nodeName) + r.ClusterNodesMap.Store(nodeName, nodeObj) + } + return true + }, + DeleteFunc: func(e event.DeleteEvent) bool { + _, ok := e.Object.(*corev1.Node) + if ok { + nodeName := e.Object.GetName() + r.ClusterNodesMap.Delete(nodeName) + log.Debug("nodeEventsHandler[delete] - nodeObj not found, deleted from sync map", "name", nodeName) + } + return true + }, + } +} +func (r *RollingUpgradeReconciler) nodeReconciler(obj client.Object) []ctrl.Request { + //do nothing, as eventHandler will populate the clusterNodes + return nil +} func (r *RollingUpgradeReconciler) SetMaxParallel(n int) { if n >= 1 { r.Info("setting max parallel reconcile", "value", n) @@ -192,27 +229,6 @@ func (r *RollingUpgradeReconciler) UpdateStatus(rollingUpgrade *v1alpha1.Rolling } } -func (r *RollingUpgradeReconciler) nodeReconciler(obj client.Object) []ctrl.Request { - var ( - nodeName = obj.GetName() - nodeObj = obj.(*corev1.Node) - ) - - // for a deleted node, delete it from sync Map as well. - var ctx context.Context - err := r.Get(ctx, types.NamespacedName{Name: nodeName}, nodeObj) - if err != nil { - if kerrors.IsNotFound(err) { - r.ClusterNodesMap.Delete(nodeName) - log.Debug("nodeReconciler[delete] - nodeObj not found, deleted from sync map", "name", nodeName) - } - } else { - log.Debug("nodeReconciler[store]", "nodeName", nodeName) - r.ClusterNodesMap.Store(nodeName, obj.(*corev1.Node)) - } - return nil -} - func (r *RollingUpgradeReconciler) getClusterNodes() []*corev1.Node { var clusterNodes []*corev1.Node From cf849e38f97b104afd240c2e590c07b90e8b0bc6 Mon Sep 17 00:00:00 2001 From: sbadiger Date: Fri, 2 Jul 2021 11:57:43 -0700 Subject: [PATCH 7/8] Ignore Reconciles on nodeEvents Signed-off-by: sbadiger --- controllers/rollingupgrade_controller.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/controllers/rollingupgrade_controller.go b/controllers/rollingupgrade_controller.go index b7f663e3..69e9f24e 100644 --- a/controllers/rollingupgrade_controller.go +++ b/controllers/rollingupgrade_controller.go @@ -34,7 +34,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" @@ -175,7 +174,7 @@ func (r *RollingUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Reque func (r *RollingUpgradeReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&v1alpha1.RollingUpgrade{}). - Watches(&source.Kind{Type: &corev1.Node{}}, handler.EnqueueRequestsFromMapFunc(r.nodeReconciler)). + Watches(&source.Kind{Type: &corev1.Node{}}, nil). WithEventFilter(r.nodeEventsHandler()). WithOptions(controller.Options{MaxConcurrentReconciles: r.maxParallel}). Complete(r) @@ -189,6 +188,7 @@ func (r *RollingUpgradeReconciler) nodeEventsHandler() predicate.Predicate { nodeName := e.Object.GetName() log.Debug("nodeEventsHandler[create] nodeObj created, stored in sync map", "nodeName", nodeName) r.ClusterNodesMap.Store(nodeName, nodeObj) + return false } return true }, @@ -198,6 +198,7 @@ func (r *RollingUpgradeReconciler) nodeEventsHandler() predicate.Predicate { nodeName := e.ObjectNew.GetName() log.Debug("nodeEventsHandler[update] nodeObj updated, updated in sync map", "nodeName", nodeName) r.ClusterNodesMap.Store(nodeName, nodeObj) + return false } return true }, @@ -207,6 +208,7 @@ func (r *RollingUpgradeReconciler) nodeEventsHandler() predicate.Predicate { nodeName := e.Object.GetName() r.ClusterNodesMap.Delete(nodeName) log.Debug("nodeEventsHandler[delete] - nodeObj not found, deleted from sync map", "name", nodeName) + return false } return true }, From 91a944b70b83e0137dcfc547c71b4e223443dcb5 Mon Sep 17 00:00:00 2001 From: sbadiger Date: Fri, 2 Jul 2021 12:03:04 -0700 Subject: [PATCH 8/8] Add comments Signed-off-by: sbadiger --- controllers/rollingupgrade_controller.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/controllers/rollingupgrade_controller.go b/controllers/rollingupgrade_controller.go index 69e9f24e..63e7f70d 100644 --- a/controllers/rollingupgrade_controller.go +++ b/controllers/rollingupgrade_controller.go @@ -180,6 +180,7 @@ func (r *RollingUpgradeReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } +// nodesEventHandler will fetch us the nodes on corresponding events, an alternative to doing explicit API calls. func (r *RollingUpgradeReconciler) nodeEventsHandler() predicate.Predicate { return predicate.Funcs{ CreateFunc: func(e event.CreateEvent) bool { @@ -214,10 +215,7 @@ func (r *RollingUpgradeReconciler) nodeEventsHandler() predicate.Predicate { }, } } -func (r *RollingUpgradeReconciler) nodeReconciler(obj client.Object) []ctrl.Request { - //do nothing, as eventHandler will populate the clusterNodes - return nil -} + func (r *RollingUpgradeReconciler) SetMaxParallel(n int) { if n >= 1 { r.Info("setting max parallel reconcile", "value", n)