Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor IngressLink controller #6348

Merged
merged 4 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 0 additions & 72 deletions internal/k8s/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,22 +564,6 @@ func (nsi *namespacedInformer) addTransportServerHandler(handlers cache.Resource
nsi.cacheSyncs = append(nsi.cacheSyncs, informer.HasSynced)
}

func (lbc *LoadBalancerController) addIngressLinkHandler(handlers cache.ResourceEventHandlerFuncs, name string) {
optionsModifier := func(options *meta_v1.ListOptions) {
options.FieldSelector = fields.Set{"metadata.name": name}.String()
}

informer := dynamicinformer.NewFilteredDynamicInformer(lbc.dynClient, ingressLinkGVR, lbc.controllerNamespace, lbc.resync,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, optionsModifier)

informer.Informer().AddEventHandlerWithResyncPeriod(handlers, lbc.resync)

lbc.ingressLinkInformer = informer.Informer()
lbc.ingressLinkLister = informer.Informer().GetStore()

lbc.cacheSyncs = append(lbc.cacheSyncs, lbc.ingressLinkInformer.HasSynced)
}

func (lbc *LoadBalancerController) addNamespaceHandler(handlers cache.ResourceEventHandlerFuncs, nsLabel string) {
optionsModifier := func(options *meta_v1.ListOptions) {
options.LabelSelector = nsLabel
Expand Down Expand Up @@ -1196,62 +1180,6 @@ func (lbc *LoadBalancerController) cleanupUnwatchedNamespacedResources(nsi *name
nsi.stop()
}

func (lbc *LoadBalancerController) syncIngressLink(task task) {
key := task.Key
glog.V(2).Infof("Adding, Updating or Deleting IngressLink: %v", key)

obj, exists, err := lbc.ingressLinkLister.GetByKey(key)
if err != nil {
lbc.syncQueue.Requeue(task, err)
return
}

if !exists {
// IngressLink got removed
lbc.statusUpdater.ClearStatusFromIngressLink()
} else {
// IngressLink is added or updated
link := obj.(*unstructured.Unstructured)

// spec.virtualServerAddress contains the IP of the BIG-IP device
ip, found, err := unstructured.NestedString(link.Object, "spec", "virtualServerAddress")
if err != nil {
glog.Errorf("Failed to get virtualServerAddress from IngressLink %s: %v", key, err)
lbc.statusUpdater.ClearStatusFromIngressLink()
} else if !found {
glog.Errorf("virtualServerAddress is not found in IngressLink %s", key)
lbc.statusUpdater.ClearStatusFromIngressLink()
} else if ip == "" {
glog.Warningf("IngressLink %s has the empty virtualServerAddress field", key)
lbc.statusUpdater.ClearStatusFromIngressLink()
} else {
lbc.statusUpdater.SaveStatusFromIngressLink(ip)
}
}

if lbc.reportStatusEnabled() {
ingresses := lbc.configuration.GetResourcesWithFilter(resourceFilter{Ingresses: true})

glog.V(3).Infof("Updating status for %v Ingresses", len(ingresses))

err := lbc.statusUpdater.UpdateExternalEndpointsForResources(ingresses)
if err != nil {
glog.Errorf("Error updating ingress status in syncIngressLink: %v", err)
}
}

if lbc.areCustomResourcesEnabled && lbc.reportCustomResourceStatusEnabled() {
virtualServers := lbc.configuration.GetResourcesWithFilter(resourceFilter{VirtualServers: true})

glog.V(3).Infof("Updating status for %v VirtualServers", len(virtualServers))

err := lbc.statusUpdater.UpdateExternalEndpointsForResources(virtualServers)
if err != nil {
glog.V(3).Infof("Error updating VirtualServer/VirtualServerRoute status in syncIngressLink: %v", err)
}
}
}

func (lbc *LoadBalancerController) syncPolicy(task task) {
key := task.Key
var obj interface{}
Expand Down
42 changes: 0 additions & 42 deletions internal/k8s/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,48 +395,6 @@ func createPolicyHandlers(lbc *LoadBalancerController) cache.ResourceEventHandle
}
}

func createIngressLinkHandlers(lbc *LoadBalancerController) cache.ResourceEventHandlerFuncs {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
link := obj.(*unstructured.Unstructured)
glog.V(3).Infof("Adding IngressLink: %v", link.GetName())
lbc.AddSyncQueue(link)
},
DeleteFunc: func(obj interface{}) {
link, isUnstructured := obj.(*unstructured.Unstructured)

if !isUnstructured {
deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.V(3).Infof("Error received unexpected object: %v", obj)
return
}
link, ok = deletedState.Obj.(*unstructured.Unstructured)
if !ok {
glog.V(3).Infof("Error DeletedFinalStateUnknown contained non-Unstructured object: %v", deletedState.Obj)
return
}
}

glog.V(3).Infof("Removing IngressLink: %v", link.GetName())
lbc.AddSyncQueue(link)
},
UpdateFunc: func(old, cur interface{}) {
oldLink := old.(*unstructured.Unstructured)
curLink := cur.(*unstructured.Unstructured)
different, err := areResourcesDifferent(oldLink, curLink)
if err != nil {
glog.V(3).Infof("Error when comparing IngressLinks: %v", err)
lbc.AddSyncQueue(curLink)
}
if different {
glog.V(3).Infof("IngressLink %v changed, syncing", oldLink.GetName())
lbc.AddSyncQueue(curLink)
}
},
}
}

// areResourcesDifferent returns true if the resources are different based on their spec.
func areResourcesDifferent(oldresource, resource *unstructured.Unstructured) (bool, error) {
oldSpec, found, err := unstructured.NestedMap(oldresource.Object, "spec")
Expand Down
124 changes: 124 additions & 0 deletions internal/k8s/ingress_link.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package k8s

import (
"github.com/golang/glog"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/tools/cache"
)

func createIngressLinkHandlers(lbc *LoadBalancerController) cache.ResourceEventHandlerFuncs {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
link := obj.(*unstructured.Unstructured)
glog.V(3).Infof("Adding IngressLink: %v", link.GetName())
lbc.AddSyncQueue(link)
},
DeleteFunc: func(obj interface{}) {
link, isUnstructured := obj.(*unstructured.Unstructured)

Check warning on line 20 in internal/k8s/ingress_link.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/ingress_link.go#L12-L20

Added lines #L12 - L20 were not covered by tests

if !isUnstructured {
deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.V(3).Infof("Error received unexpected object: %v", obj)
return

Check warning on line 26 in internal/k8s/ingress_link.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/ingress_link.go#L22-L26

Added lines #L22 - L26 were not covered by tests
}
link, ok = deletedState.Obj.(*unstructured.Unstructured)
if !ok {
glog.V(3).Infof("Error DeletedFinalStateUnknown contained non-Unstructured object: %v", deletedState.Obj)
return

Check warning on line 31 in internal/k8s/ingress_link.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/ingress_link.go#L28-L31

Added lines #L28 - L31 were not covered by tests
}
}

glog.V(3).Infof("Removing IngressLink: %v", link.GetName())
lbc.AddSyncQueue(link)

Check warning on line 36 in internal/k8s/ingress_link.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/ingress_link.go#L35-L36

Added lines #L35 - L36 were not covered by tests
},
UpdateFunc: func(old, cur interface{}) {
oldLink := old.(*unstructured.Unstructured)
curLink := cur.(*unstructured.Unstructured)
different, err := areResourcesDifferent(oldLink, curLink)
if err != nil {
glog.V(3).Infof("Error when comparing IngressLinks: %v", err)
lbc.AddSyncQueue(curLink)

Check warning on line 44 in internal/k8s/ingress_link.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/ingress_link.go#L38-L44

Added lines #L38 - L44 were not covered by tests
}
if different {
glog.V(3).Infof("IngressLink %v changed, syncing", oldLink.GetName())
lbc.AddSyncQueue(curLink)

Check warning on line 48 in internal/k8s/ingress_link.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/ingress_link.go#L46-L48

Added lines #L46 - L48 were not covered by tests
}
},
}
}

func (lbc *LoadBalancerController) addIngressLinkHandler(handlers cache.ResourceEventHandlerFuncs, name string) {
optionsModifier := func(options *meta_v1.ListOptions) {
options.FieldSelector = fields.Set{"metadata.name": name}.String()

Check warning on line 56 in internal/k8s/ingress_link.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/ingress_link.go#L54-L56

Added lines #L54 - L56 were not covered by tests
}

informer := dynamicinformer.NewFilteredDynamicInformer(lbc.dynClient, ingressLinkGVR, lbc.controllerNamespace, lbc.resync,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, optionsModifier)

Check warning on line 60 in internal/k8s/ingress_link.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/ingress_link.go#L59-L60

Added lines #L59 - L60 were not covered by tests

informer.Informer().AddEventHandlerWithResyncPeriod(handlers, lbc.resync) //nolint:errcheck,gosec

Check warning on line 62 in internal/k8s/ingress_link.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/ingress_link.go#L62

Added line #L62 was not covered by tests
pdabelf5 marked this conversation as resolved.
Show resolved Hide resolved

lbc.ingressLinkInformer = informer.Informer()
lbc.ingressLinkLister = informer.Informer().GetStore()

Check warning on line 65 in internal/k8s/ingress_link.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/ingress_link.go#L64-L65

Added lines #L64 - L65 were not covered by tests

lbc.cacheSyncs = append(lbc.cacheSyncs, lbc.ingressLinkInformer.HasSynced)

Check warning on line 67 in internal/k8s/ingress_link.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/ingress_link.go#L67

Added line #L67 was not covered by tests
}

func (lbc *LoadBalancerController) syncIngressLink(task task) {
key := task.Key
glog.V(2).Infof("Adding, Updating or Deleting IngressLink: %v", key)

Check warning on line 72 in internal/k8s/ingress_link.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/ingress_link.go#L70-L72

Added lines #L70 - L72 were not covered by tests

obj, exists, err := lbc.ingressLinkLister.GetByKey(key)
if err != nil {
lbc.syncQueue.Requeue(task, err)
return

Check warning on line 77 in internal/k8s/ingress_link.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/ingress_link.go#L74-L77

Added lines #L74 - L77 were not covered by tests
}

if !exists {

Check warning on line 80 in internal/k8s/ingress_link.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/ingress_link.go#L80

Added line #L80 was not covered by tests
// IngressLink got removed
lbc.statusUpdater.ClearStatusFromIngressLink()
} else {

Check warning on line 83 in internal/k8s/ingress_link.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/ingress_link.go#L82-L83

Added lines #L82 - L83 were not covered by tests
// IngressLink is added or updated
link := obj.(*unstructured.Unstructured)

Check warning on line 85 in internal/k8s/ingress_link.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/ingress_link.go#L85

Added line #L85 was not covered by tests

// spec.virtualServerAddress contains the IP of the BIG-IP device
ip, found, err := unstructured.NestedString(link.Object, "spec", "virtualServerAddress")
if err != nil {
glog.Errorf("Failed to get virtualServerAddress from IngressLink %s: %v", key, err)
lbc.statusUpdater.ClearStatusFromIngressLink()
} else if !found {
glog.Errorf("virtualServerAddress is not found in IngressLink %s", key)
lbc.statusUpdater.ClearStatusFromIngressLink()
} else if ip == "" {
glog.Warningf("IngressLink %s has the empty virtualServerAddress field", key)
lbc.statusUpdater.ClearStatusFromIngressLink()
} else {
lbc.statusUpdater.SaveStatusFromIngressLink(ip)

Check warning on line 99 in internal/k8s/ingress_link.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/ingress_link.go#L88-L99

Added lines #L88 - L99 were not covered by tests
}
}

if lbc.reportStatusEnabled() {
ingresses := lbc.configuration.GetResourcesWithFilter(resourceFilter{Ingresses: true})

Check warning on line 104 in internal/k8s/ingress_link.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/ingress_link.go#L103-L104

Added lines #L103 - L104 were not covered by tests

glog.V(3).Infof("Updating status for %v Ingresses", len(ingresses))

Check warning on line 106 in internal/k8s/ingress_link.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/ingress_link.go#L106

Added line #L106 was not covered by tests

err := lbc.statusUpdater.UpdateExternalEndpointsForResources(ingresses)
if err != nil {
glog.Errorf("Error updating ingress status in syncIngressLink: %v", err)

Check warning on line 110 in internal/k8s/ingress_link.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/ingress_link.go#L108-L110

Added lines #L108 - L110 were not covered by tests
}
}

if lbc.areCustomResourcesEnabled && lbc.reportCustomResourceStatusEnabled() {
virtualServers := lbc.configuration.GetResourcesWithFilter(resourceFilter{VirtualServers: true})

Check warning on line 115 in internal/k8s/ingress_link.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/ingress_link.go#L114-L115

Added lines #L114 - L115 were not covered by tests

glog.V(3).Infof("Updating status for %v VirtualServers", len(virtualServers))

Check warning on line 117 in internal/k8s/ingress_link.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/ingress_link.go#L117

Added line #L117 was not covered by tests

err := lbc.statusUpdater.UpdateExternalEndpointsForResources(virtualServers)
if err != nil {
glog.V(3).Infof("Error updating VirtualServer/VirtualServerRoute status in syncIngressLink: %v", err)

Check warning on line 121 in internal/k8s/ingress_link.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/ingress_link.go#L119-L121

Added lines #L119 - L121 were not covered by tests
}
}
}
Loading