Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upgrade-manager-v2: Add nodeEvents handler instead of a watch handler #272

Merged
merged 9 commits into from
Jul 12, 2021
1 change: 1 addition & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ rules:
- get
- list
- patch
- watch
- apiGroups:
- ""
resources:
Expand Down
8 changes: 1 addition & 7 deletions controllers/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var (
type DiscoveredState struct {
*RollingUpgradeAuthenticator
logr.Logger
ClusterNodes *corev1.NodeList
ClusterNodes []*corev1.Node
LaunchTemplates []*ec2.LaunchTemplate
ScalingGroups []*autoscaling.Group
InProgressInstances []string
Expand Down Expand Up @@ -67,11 +67,5 @@ func (d *DiscoveredState) Discover() error {
}
d.InProgressInstances = inProgressInstances

nodes, err := d.KubernetesClientSet.ListClusterNodes()
if err != nil || nodes == nil || nodes.Size() == 0 {
return errors.Wrap(err, "failed to discover cluster nodes")
}
d.ClusterNodes = nodes

return nil
}
128 changes: 118 additions & 10 deletions controllers/helpers_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package controllers

import (
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -125,9 +126,17 @@ func createNodeList() *corev1.NodeList {
}
}

func createNode() *corev1.Node {
func createNodeSlice() []*corev1.Node {
return []*corev1.Node{
createNode("mock-node-1"),
createNode("mock-node-2"),
createNode("mock-node-3"),
}
}

func createNode(name string) *corev1.Node {
return &corev1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "mock-node-1", Namespace: "default"},
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: "default"},
Spec: corev1.NodeSpec{ProviderID: "foo-bar/mock-instance-1"},
Status: corev1.NodeStatus{
Conditions: []corev1.NodeCondition{
Expand All @@ -140,18 +149,27 @@ func createNode() *corev1.Node {
// AWS
type MockAutoscalingGroup struct {
autoscalingiface.AutoScalingAPI
errorFlag bool
awsErr awserr.Error
errorInstanceId string
autoScalingGroups []*autoscaling.Group
errorFlag bool
awsErr awserr.Error
errorInstanceId string
autoScalingGroups []*autoscaling.Group
Groups map[string]*autoscaling.Group
LaunchConfigurations map[string]*autoscaling.LaunchConfiguration
}

type launchTemplateInfo struct {
data *ec2.ResponseLaunchTemplateData
name *string
}
type MockEC2 struct {
ec2iface.EC2API
awsErr awserr.Error
reservations []*ec2.Reservation
awsErr awserr.Error
reservations []*ec2.Reservation
LaunchTemplates map[string]*launchTemplateInfo
}

var _ ec2iface.EC2API = &MockEC2{}

func createASGInstance(instanceID string, launchConfigName string) *autoscaling.Instance {
return &autoscaling.Instance{
InstanceId: &instanceID,
Expand Down Expand Up @@ -188,8 +206,8 @@ func createASGClient() *MockAutoscalingGroup {
}
}

func createEc2Client() MockEC2 {
return MockEC2{}
func createEc2Client() *MockEC2 {
return &MockEC2{}
}

func createAmazonClient(t *testing.T) *awsprovider.AmazonClientSet {
Expand All @@ -199,6 +217,8 @@ func createAmazonClient(t *testing.T) *awsprovider.AmazonClientSet {
}
}

/******************************* AWS MOCKS *******************************/

func (mockAutoscalingGroup MockAutoscalingGroup) TerminateInstanceInAutoScalingGroup(input *autoscaling.TerminateInstanceInAutoScalingGroupInput) (*autoscaling.TerminateInstanceInAutoScalingGroupOutput, error) {
output := &autoscaling.TerminateInstanceInAutoScalingGroupOutput{}
if mockAutoscalingGroup.errorFlag {
Expand All @@ -213,3 +233,91 @@ func (mockAutoscalingGroup MockAutoscalingGroup) TerminateInstanceInAutoScalingG
output.Activity = &asgChange
return output, nil
}

// DescribeLaunchTemplatesPages mocks the describing the launch templates
func (m *MockEC2) DescribeLaunchTemplatesPages(request *ec2.DescribeLaunchTemplatesInput, callback func(*ec2.DescribeLaunchTemplatesOutput, bool) bool) error {
page, err := m.DescribeLaunchTemplates(request)
if err != nil {
return err
}

callback(page, false)

return nil
}

// DescribeLaunchTemplates mocks the describing the launch templates
func (m *MockEC2) DescribeLaunchTemplates(request *ec2.DescribeLaunchTemplatesInput) (*ec2.DescribeLaunchTemplatesOutput, error) {

o := &ec2.DescribeLaunchTemplatesOutput{}

if m.LaunchTemplates == nil {
return o, nil
}

for id, ltInfo := range m.LaunchTemplates {
launchTemplatetName := aws.StringValue(ltInfo.name)

allFiltersMatch := true
for _, filter := range request.Filters {
filterName := aws.StringValue(filter.Name)
filterValue := aws.StringValue(filter.Values[0])

filterMatches := false
if filterName == "tag:Name" && filterValue == launchTemplatetName {
filterMatches = true
}
if strings.HasPrefix(filterName, "tag:kubernetes.io/cluster/") {
filterMatches = true
}

if !filterMatches {
allFiltersMatch = false
break
}
}

if allFiltersMatch {
o.LaunchTemplates = append(o.LaunchTemplates, &ec2.LaunchTemplate{
LaunchTemplateName: aws.String(launchTemplatetName),
LaunchTemplateId: aws.String(id),
})
}
}

return o, nil
}

func (m *MockAutoscalingGroup) DescribeAutoScalingGroupsPages(request *autoscaling.DescribeAutoScalingGroupsInput, callback func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) error {
// For the mock, we just send everything in one page
page, err := m.DescribeAutoScalingGroups(request)
if err != nil {
return err
}

callback(page, false)

return nil
}

func (m *MockAutoscalingGroup) DescribeAutoScalingGroups(input *autoscaling.DescribeAutoScalingGroupsInput) (*autoscaling.DescribeAutoScalingGroupsOutput, error) {
return &autoscaling.DescribeAutoScalingGroupsOutput{
AutoScalingGroups: createASGs(),
}, nil
}

func (m *MockEC2) DescribeInstancesPages(request *ec2.DescribeInstancesInput, callback func(*ec2.DescribeInstancesOutput, bool) bool) error {
// For the mock, we just send everything in one page
page, err := m.DescribeInstances(request)
if err != nil {
return err
}

callback(page, false)

return nil
}

func (m *MockEC2) DescribeInstances(*ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error) {
return &ec2.DescribeInstancesOutput{}, nil
}
15 changes: 7 additions & 8 deletions controllers/providers/kubernetes/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"os"
"os/user"
"reflect"
"strings"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -87,28 +86,28 @@ func GetKubernetesLocalConfig() (*rest.Config, error) {
return config, nil
}

func SelectNodeByInstanceID(instanceID string, nodes *corev1.NodeList) corev1.Node {
func SelectNodeByInstanceID(instanceID string, nodes []*corev1.Node) *corev1.Node {
if nodes != nil {
for _, node := range nodes.Items {
for _, node := range nodes {
nodeID := GetNodeInstanceID(node)
if strings.EqualFold(instanceID, nodeID) {
return node
}
}
}
return corev1.Node{}
return nil
}

func GetNodeInstanceID(node corev1.Node) string {
if !reflect.DeepEqual(node, &corev1.Node{}) {
func GetNodeInstanceID(node *corev1.Node) string {
if node != nil {
tokens := strings.Split(node.Spec.ProviderID, "/")
nodeInstanceID := tokens[len(tokens)-1]
return nodeInstanceID
}
return ""
}

func IsNodeReady(node corev1.Node) bool {
func IsNodeReady(node *corev1.Node) bool {
for _, condition := range node.Status.Conditions {
if condition.Type == corev1.NodeReady && condition.Status == corev1.ConditionTrue {
return true
Expand All @@ -117,7 +116,7 @@ func IsNodeReady(node corev1.Node) bool {
return false
}

func IsNodePassesReadinessGates(node corev1.Node, requiredReadinessGates []v1alpha1.NodeReadinessGate) bool {
func IsNodePassesReadinessGates(node *corev1.Node, requiredReadinessGates []v1alpha1.NodeReadinessGate) bool {
if len(requiredReadinessGates) == 0 {
return true
}
Expand Down
67 changes: 66 additions & 1 deletion controllers/rollingupgrade_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,19 @@ import (
"github.com/keikoproj/aws-sdk-go-cache/cache"
"github.com/keikoproj/upgrade-manager/api/v1alpha1"
"github.com/keikoproj/upgrade-manager/controllers/common"
"github.com/keikoproj/upgrade-manager/controllers/common/log"
awsprovider "github.com/keikoproj/upgrade-manager/controllers/providers/aws"
kubeprovider "github.com/keikoproj/upgrade-manager/controllers/providers/kubernetes"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

// RollingUpgradeReconciler reconciles a RollingUpgrade object
Expand All @@ -47,6 +52,7 @@ type RollingUpgradeReconciler struct {
Auth *RollingUpgradeAuthenticator
DrainGroupMapper *sync.Map
DrainErrorMapper *sync.Map
ClusterNodesMap *sync.Map
}

type RollingUpgradeAuthenticator struct {
Expand All @@ -56,7 +62,7 @@ type RollingUpgradeAuthenticator struct {

// +kubebuilder:rbac:groups=upgrademgr.keikoproj.io,resources=rollingupgrades,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=upgrademgr.keikoproj.io,resources=rollingupgrades/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;patch
// +kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;patch;watch
// +kubebuilder:rbac:groups=core,resources=pods,verbs=list
// +kubebuilder:rbac:groups=core,resources=events,verbs=create
// +kubebuilder:rbac:groups=core,resources=pods/eviction,verbs=create
Expand Down Expand Up @@ -145,6 +151,13 @@ func (r *RollingUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Reque
},
RollingUpgrade: rollingUpgrade,
metricsMutex: &sync.Mutex{},

// discover the K8s cluster at controller level through watch
Cloud: func() *DiscoveredState {
var c = NewDiscoveredState(r.Auth, r.Logger)
c.ClusterNodes = r.getClusterNodes()
return c
}(),
}

// process node rotation
Expand All @@ -161,10 +174,48 @@ func (r *RollingUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Reque
func (r *RollingUpgradeReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.RollingUpgrade{}).
Watches(&source.Kind{Type: &corev1.Node{}}, nil).
WithEventFilter(r.nodeEventsHandler()).
WithOptions(controller.Options{MaxConcurrentReconciles: r.maxParallel}).
Complete(r)
}

// nodesEventHandler will fetch us the nodes on corresponding events, an alternative to doing explicit API calls.
func (r *RollingUpgradeReconciler) nodeEventsHandler() predicate.Predicate {
return predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
nodeObj, ok := e.Object.(*corev1.Node)
if ok {
nodeName := e.Object.GetName()
log.Debug("nodeEventsHandler[create] nodeObj created, stored in sync map", "nodeName", nodeName)
r.ClusterNodesMap.Store(nodeName, nodeObj)
return false
}
return true
},
UpdateFunc: func(e event.UpdateEvent) bool {
nodeObj, ok := e.ObjectNew.(*corev1.Node)
if ok {
nodeName := e.ObjectNew.GetName()
log.Debug("nodeEventsHandler[update] nodeObj updated, updated in sync map", "nodeName", nodeName)
r.ClusterNodesMap.Store(nodeName, nodeObj)
return false
}
return true
},
DeleteFunc: func(e event.DeleteEvent) bool {
_, ok := e.Object.(*corev1.Node)
if ok {
nodeName := e.Object.GetName()
r.ClusterNodesMap.Delete(nodeName)
log.Debug("nodeEventsHandler[delete] - nodeObj not found, deleted from sync map", "name", nodeName)
return false
}
return true
},
}
}

func (r *RollingUpgradeReconciler) SetMaxParallel(n int) {
if n >= 1 {
r.Info("setting max parallel reconcile", "value", n)
Expand All @@ -177,3 +228,17 @@ func (r *RollingUpgradeReconciler) UpdateStatus(rollingUpgrade *v1alpha1.Rolling
r.Info("failed to update status", "message", err.Error(), "name", rollingUpgrade.NamespacedName())
}
}

func (r *RollingUpgradeReconciler) getClusterNodes() []*corev1.Node {
var clusterNodes []*corev1.Node

m := map[string]interface{}{}
r.ClusterNodesMap.Range(func(key, value interface{}) bool {
m[fmt.Sprint(key)] = value
return true
})
for _, value := range m {
clusterNodes = append(clusterNodes, value.(*corev1.Node))
}
return clusterNodes
}
Loading