Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cherry Pick from master to 1.22] [FIXES] [Tagging Controller] Fix issues in tagging controller #389

Merged
merged 1 commit into from
May 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/controllers/tagging/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var (
Name: "cloudprovider_aws_tagging_controller_work_item_duration_seconds",
Help: "workitem latency of workitem being in the queue and time it takes to process",
StabilityLevel: metrics.ALPHA,
Buckets: metrics.ExponentialBuckets(0.5, 1.5, 20),
},
[]string{"latency_type"})

Expand Down
95 changes: 82 additions & 13 deletions pkg/controllers/tagging/tagging_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package tagging

import (
"crypto/md5"
"fmt"
v1 "k8s.io/api/core/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand All @@ -25,7 +26,10 @@ import (
cloudprovider "k8s.io/cloud-provider"
opt "k8s.io/cloud-provider-aws/pkg/controllers/options"
awsv1 "k8s.io/cloud-provider-aws/pkg/providers/v1"
nodehelpers "k8s.io/cloud-provider/node/helpers"
"k8s.io/klog/v2"
"sort"
"strings"
"time"
)

Expand All @@ -37,7 +41,13 @@ type workItem struct {
enqueueTime time.Time
}

func (w workItem) String() string {
return fmt.Sprintf("[Node: %s, RequeuingCount: %d, EnqueueTime: %s]", w.node.GetName(), w.requeuingCount, w.enqueueTime)
}

const (
taggingControllerLabelKey = "k8s.io/cloud-provider-aws"

maxRequeuingCount = 9

// The label for depicting total number of errors a work item encounter and succeed
Expand Down Expand Up @@ -105,9 +115,27 @@ func NewTaggingController(
// Use shared informer to listen to add/update/delete of nodes. Note that any nodes
// that exist before tagging controller starts will show up in the update method
tc.nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { tc.enqueueNode(obj, tc.tagNodesResources) },
UpdateFunc: func(oldObj, newObj interface{}) { tc.enqueueNode(newObj, tc.tagNodesResources) },
DeleteFunc: func(obj interface{}) { tc.enqueueNode(obj, tc.untagNodeResources) },
AddFunc: func(obj interface{}) {
node := obj.(*v1.Node)
tc.enqueueNode(node, tc.tagNodesResources)
},
UpdateFunc: func(oldObj, newObj interface{}) {
node := newObj.(*v1.Node)
// Check if tagging is required by inspecting the labels. This check here prevents us from putting a tagged node into the
// work queue. We check this again before tagging the node to make sure that between when a node was put in the work queue
// and when it gets tagged, there might be another event which put the same item in the work queue
// (since the node won't have the labels yet) and hence prevents us from making an unnecessary EC2 call.
if !tc.isTaggingRequired(node) {
klog.Infof("Skip putting node %s in work queue since it was already tagged earlier.", node.GetName())
return
}

tc.enqueueNode(node, tc.tagNodesResources)
},
DeleteFunc: func(obj interface{}) {
node := obj.(*v1.Node)
tc.enqueueNode(node, tc.untagNodeResources)
},
})

return tc, nil
Expand Down Expand Up @@ -147,28 +175,36 @@ func (tc *Controller) process() bool {
return false
}

klog.Infof("Starting to process %v", obj)
klog.Infof("Starting to process %s", obj)

err := func(obj interface{}) error {
defer tc.workqueue.Done(obj)

workItem, ok := obj.(*workItem)
if !ok {
tc.workqueue.Forget(obj)
err := fmt.Errorf("expected workItem in workqueue but got %#v", obj)
err := fmt.Errorf("expected workItem in workqueue but got %s", obj)
utilruntime.HandleError(err)
return nil
}

timeTaken := time.Since(workItem.enqueueTime).Seconds()
recordWorkItemLatencyMetrics(workItemDequeuingTimeWorkItemMetric, timeTaken)
klog.Infof("Dequeuing latency %s", timeTaken)

instanceID, err := awsv1.KubernetesInstanceID(workItem.node.Spec.ProviderID).MapToAWSInstanceID()
if err != nil {
err = fmt.Errorf("Error in getting instanceID for node %s, error: %v", workItem.node.GetName(), err)
utilruntime.HandleError(err)
return nil
}
klog.Infof("Instance ID of work item %s is %s", workItem, instanceID)

if awsv1.IsFargateNode(string(instanceID)) {
klog.Infof("Skip processing the node %s since it is a Fargate node", instanceID)
tc.workqueue.Forget(obj)
return nil
}

err = workItem.action(workItem.node)

Expand All @@ -182,20 +218,21 @@ func (tc *Controller) process() bool {
return fmt.Errorf("error processing work item '%v': %s, requeuing count %d", workItem, err.Error(), workItem.requeuingCount)
}

klog.Errorf("error processing work item '%v': %s, requeuing count exceeded", workItem, err.Error())
klog.Errorf("error processing work item %s: %s, requeuing count exceeded", workItem, err.Error())
recordWorkItemErrorMetrics(errorsAfterRetriesExhaustedWorkItemErrorMetric, string(instanceID))
} else {
klog.Infof("Finished processing %v", workItem)
klog.Infof("Finished processing %s", workItem)
timeTaken = time.Since(workItem.enqueueTime).Seconds()
recordWorkItemLatencyMetrics(workItemProcessingTimeWorkItemMetric, timeTaken)
klog.Infof("Processing latency %s", timeTaken)
}

tc.workqueue.Forget(obj)
return nil
}(obj)

if err != nil {
klog.Errorf("Error occurred while processing %v", obj)
klog.Errorf("Error occurred while processing %s", obj)
utilruntime.HandleError(err)
}

Expand All @@ -221,16 +258,28 @@ func (tc *Controller) tagNodesResources(node *v1.Node) error {
// tagEc2Instances applies the provided tags to each EC2 instance in
// the cluster.
func (tc *Controller) tagEc2Instance(node *v1.Node) error {
if !tc.isTaggingRequired(node) {
klog.Infof("Skip tagging node %s since it was already tagged earlier.", node.GetName())
return nil
}

instanceID, _ := awsv1.KubernetesInstanceID(node.Spec.ProviderID).MapToAWSInstanceID()

err := tc.cloud.TagResource(string(instanceID), tc.tags)

if err != nil {
klog.Errorf("Error in tagging EC2 instance for node %s, error: %v", node.GetName(), err)
klog.Errorf("Error in tagging EC2 instance %s for node %s, error: %v", instanceID, node.GetName(), err)
return err
}

klog.Infof("Successfully tagged %s with %v", instanceID, tc.tags)
labels := map[string]string{taggingControllerLabelKey: tc.getChecksumOfTags()}
klog.Infof("Successfully tagged %s with %v. Labeling the nodes with tagging controller labels now.", instanceID, tc.tags)
if !nodehelpers.AddOrUpdateLabelsOnNode(tc.kubeClient, labels, node) {
klog.Errorf("Couldn't apply labels %s to node %s.", labels, node.GetName())
return fmt.Errorf("couldn't apply labels %s to node %s", labels, node.GetName())
}

klog.Infof("Successfully labeled node %s with %v.", node.GetName(), labels)

return nil
}
Expand Down Expand Up @@ -259,7 +308,7 @@ func (tc *Controller) untagEc2Instance(node *v1.Node) error {
err := tc.cloud.UntagResource(string(instanceID), tc.tags)

if err != nil {
klog.Errorf("Error in untagging EC2 instance for node %s, error: %v", node.GetName(), err)
klog.Errorf("Error in untagging EC2 instance %s for node %s, error: %v", instanceID, node.GetName(), err)
return err
}

Expand All @@ -270,8 +319,7 @@ func (tc *Controller) untagEc2Instance(node *v1.Node) error {

// enqueueNode takes in the object and an
// action for the object for a workitem and enqueue to the workqueue
func (tc *Controller) enqueueNode(obj interface{}, action func(node *v1.Node) error) {
node := obj.(*v1.Node)
func (tc *Controller) enqueueNode(node *v1.Node, action func(node *v1.Node) error) {
item := &workItem{
node: node,
action: action,
Expand All @@ -281,3 +329,24 @@ func (tc *Controller) enqueueNode(obj interface{}, action func(node *v1.Node) er
tc.workqueue.Add(item)
klog.Infof("Added %s to the workqueue", item)
}

func (tc *Controller) isTaggingRequired(node *v1.Node) bool {
if node.Labels == nil {
return true
}

if labelValue, ok := node.Labels[taggingControllerLabelKey]; !ok || labelValue != tc.getChecksumOfTags() {
return true
}

return false
}

func (tc *Controller) getChecksumOfTags() string {
tags := []string{}
for key, value := range tc.tags {
tags = append(tags, key+"="+value)
}
sort.Strings(tags)
return fmt.Sprintf("%x", md5.Sum([]byte(strings.Join(tags, ","))))
}
52 changes: 50 additions & 2 deletions pkg/controllers/tagging/tagging_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,54 @@ func Test_NodesJoiningAndLeaving(t *testing.T) {
toBeTagged: true,
expectedMessages: []string{"Successfully tagged i-0001"},
},
{
name: "node0 joins the cluster and was tagged earlier with different tags.",
currNode: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
Labels: map[string]string{
taggingControllerLabelKey: "9767c4972ba72e87ab553bad2afde741", // MD5 for key1=value1
},
},
Spec: v1.NodeSpec{
ProviderID: "i-0001",
},
},
toBeTagged: true,
expectedMessages: []string{"Successfully tagged i-0001"},
},
{
name: "node0 joins the cluster but isn't tagged because it was already tagged earlier.",
currNode: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
Labels: map[string]string{
taggingControllerLabelKey: "c812faa65d1d5e5aefa6b069b3da39df", // MD5 for key1=value1,key2=value2
},
},
Spec: v1.NodeSpec{
ProviderID: "i-0001",
},
},
toBeTagged: true,
expectedMessages: []string{"Skip tagging node node0 since it was already tagged earlier."},
},
{
name: "fargate node joins the cluster.",
currNode: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "fargatenode0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Spec: v1.NodeSpec{
ProviderID: "aws:///us-west-2a/2ea696a557-9e55466d21eb4f83a99a9aa396bbd134/fargate-ip-10-0-55-27.us-west-2.compute.internal",
},
},
toBeTagged: true,
expectedMessages: []string{"Skip processing the node fargate-ip-10-0-55-27.us-west-2.compute.internal since it is a Fargate node"},
},
{
name: "node0 leaves the cluster, failed to untag.",
currNode: &v1.Node{
Expand All @@ -81,7 +129,7 @@ func Test_NodesJoiningAndLeaving(t *testing.T) {
},
},
toBeTagged: false,
expectedMessages: []string{"Error in untagging EC2 instance for node node0"},
expectedMessages: []string{"Error in untagging EC2 instance i-error for node node0"},
},
{
name: "node0 leaves the cluster.",
Expand Down Expand Up @@ -124,7 +172,7 @@ func Test_NodesJoiningAndLeaving(t *testing.T) {
kubeClient: clientset,
cloud: fakeAws,
nodeMonitorPeriod: 1 * time.Second,
tags: map[string]string{"key": "value"},
tags: map[string]string{"key2": "value2", "key1": "value1"},
resources: []string{"instance"},
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Tagging"),
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/providers/v1/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -1836,7 +1836,7 @@ func (c *Cloud) NodeAddressesByProviderID(ctx context.Context, providerID string
return nil, err
}

if isFargateNode(string(instanceID)) {
if IsFargateNode(string(instanceID)) {
eni, err := c.describeNetworkInterfaces(string(instanceID))
if eni == nil || err != nil {
return nil, err
Expand Down Expand Up @@ -1879,7 +1879,7 @@ func (c *Cloud) InstanceExistsByProviderID(ctx context.Context, providerID strin
return false, err
}

if isFargateNode(string(instanceID)) {
if IsFargateNode(string(instanceID)) {
eni, err := c.describeNetworkInterfaces(string(instanceID))
return eni != nil, err
}
Expand Down Expand Up @@ -1919,7 +1919,7 @@ func (c *Cloud) InstanceShutdownByProviderID(ctx context.Context, providerID str
return false, err
}

if isFargateNode(string(instanceID)) {
if IsFargateNode(string(instanceID)) {
eni, err := c.describeNetworkInterfaces(string(instanceID))
return eni != nil, err
}
Expand Down Expand Up @@ -1980,7 +1980,7 @@ func (c *Cloud) InstanceTypeByProviderID(ctx context.Context, providerID string)
return "", err
}

if isFargateNode(string(instanceID)) {
if IsFargateNode(string(instanceID)) {
return "", nil
}

Expand Down Expand Up @@ -2089,7 +2089,7 @@ func (c *Cloud) GetZoneByProviderID(ctx context.Context, providerID string) (clo
return cloudprovider.Zone{}, err
}

if isFargateNode(string(instanceID)) {
if IsFargateNode(string(instanceID)) {
eni, err := c.describeNetworkInterfaces(string(instanceID))
if eni == nil || err != nil {
return cloudprovider.Zone{}, err
Expand Down Expand Up @@ -5248,8 +5248,8 @@ func (c *Cloud) getFullInstance(nodeName types.NodeName) (*awsInstance, *ec2.Ins
return awsInstance, instance, err
}

// isFargateNode returns true if given node runs on Fargate compute
func isFargateNode(nodeName string) bool {
// IsFargateNode returns true if given node runs on Fargate compute
func IsFargateNode(nodeName string) bool {
return strings.HasPrefix(nodeName, fargateNodeNamePrefix)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/providers/v1/instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (name KubernetesInstanceID) MapToAWSInstanceID() (InstanceID, error) {

// We sanity check the resulting volume; the two known formats are
// i-12345678 and i-12345678abcdef01
if awsID == "" || !(awsInstanceRegMatch.MatchString(awsID) || isFargateNode(awsID)) {
if awsID == "" || !(awsInstanceRegMatch.MatchString(awsID) || IsFargateNode(awsID)) {
return "", fmt.Errorf("Invalid format for AWS instance (%s)", name)
}

Expand Down