diff --git a/controller/k8s/api.go b/controller/k8s/api.go index e3e90fb026d72..73056e04b9526 100644 --- a/controller/k8s/api.go +++ b/controller/k8s/api.go @@ -12,6 +12,7 @@ import ( l5dcrdclient "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned" l5dcrdinformer "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions" ewinformers "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions/externalworkload/v1beta1" + linkinformers "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions/link/v1alpha2" srvinformers "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions/server/v1beta3" spinformers "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions/serviceprofile/v1alpha2" "github.com/linkerd/linkerd2/pkg/k8s" @@ -52,6 +53,7 @@ type API struct { es discoveryinformers.EndpointSliceInformer ew ewinformers.ExternalWorkloadInformer job batchv1informers.JobInformer + link linkinformers.LinkInformer mwc arinformers.MutatingWebhookConfigurationInformer ns coreinformers.NamespaceInformer pod coreinformers.PodInformer @@ -248,6 +250,13 @@ func newAPI( api.job = sharedInformers.Batch().V1().Jobs() api.syncChecks = append(api.syncChecks, api.job.Informer().HasSynced) api.promGauges.addInformerSize(k8s.Job, informerLabels, api.job.Informer()) + case Link: + if l5dCrdSharedInformers == nil { + panic("Linkerd CRD shared informer not configured") + } + api.link = l5dCrdSharedInformers.Link().V1alpha2().Links() + api.syncChecks = append(api.syncChecks, api.link.Informer().HasSynced) + api.promGauges.addInformerSize(k8s.Link, informerLabels, api.link.Informer()) case MWC: api.mwc = sharedInformers.Admissionregistration().V1().MutatingWebhookConfigurations() api.syncChecks = append(api.syncChecks, api.mwc.Informer().HasSynced) diff --git a/controller/k8s/api_resource.go b/controller/k8s/api_resource.go index ee4ef760a07c7..c3cb4b1d6330d 100644 --- a/controller/k8s/api_resource.go +++ b/controller/k8s/api_resource.go @@ -32,6 +32,7 @@ const ( ES // EndpointSlice resource ExtWorkload Job + Link MWC NS Pod diff --git a/controller/k8s/test_helper.go b/controller/k8s/test_helper.go index b34da0c2f963e..7b2298689d37b 100644 --- a/controller/k8s/test_helper.go +++ b/controller/k8s/test_helper.go @@ -57,6 +57,7 @@ func NewFakeClusterScopedAPI(clientSet kubernetes.Interface, l5dClientSet l5dcrd DS, Endpoint, Job, + Link, MWC, NS, Pod, diff --git a/multicluster/cmd/service-mirror/main.go b/multicluster/cmd/service-mirror/main.go index 39a094d2d37c0..0cc976e7e4851 100644 --- a/multicluster/cmd/service-mirror/main.go +++ b/multicluster/cmd/service-mirror/main.go @@ -21,7 +21,6 @@ import ( log "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" @@ -149,58 +148,7 @@ func Main(args []string) { log.Infof("Starting Link informer") informerFactory.Start(ctx.Done()) - _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - link, ok := obj.(*v1alpha2.Link) - if !ok { - log.Errorf("object is not a Link: %+v", obj) - return - } - if link.GetName() == linkName { - select { - case results <- link: - default: - log.Errorf("Link update dropped (queue full): %s", link.GetName()) - } - } - }, - UpdateFunc: func(_, obj interface{}) { - link, ok := obj.(*v1alpha2.Link) - if !ok { - log.Errorf("object is not a Link: %+v", obj) - return - } - if link.GetName() == linkName { - select { - case results <- link: - default: - log.Errorf("Link update dropped (queue full): %s", link.GetName()) - } - } - }, - DeleteFunc: func(obj interface{}) { - link, ok := obj.(*v1alpha2.Link) - if !ok { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - log.Errorf("couldn't get object from DeletedFinalStateUnknown %#v", obj) - return - } - link, ok = tombstone.Obj.(*v1alpha2.Link) - if !ok { - log.Errorf("DeletedFinalStateUnknown contained object that is not a Link %#v", obj) - return - } - } - if link.GetName() == linkName { - select { - case results <- nil: // nil indicates the link was deleted - default: - log.Errorf("Link delete dropped (queue full): %s", link.GetName()) - } - } - }, - }) + _, err := informer.AddEventHandler(servicemirror.GetLinkHandlers(results, linkName)) if err != nil { log.Fatalf("Failed to add event handler to Link informer: %s", err) } diff --git a/multicluster/service-mirror/link_handlers.go b/multicluster/service-mirror/link_handlers.go new file mode 100644 index 0000000000000..512379172ac46 --- /dev/null +++ b/multicluster/service-mirror/link_handlers.go @@ -0,0 +1,73 @@ +package servicemirror + +import ( + "reflect" + + "github.com/linkerd/linkerd2/controller/gen/apis/link/v1alpha2" + log "github.com/sirupsen/logrus" + "k8s.io/client-go/tools/cache" +) + +func GetLinkHandlers(results chan<- *v1alpha2.Link, linkName string) cache.ResourceEventHandlerFuncs { + return cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + link, ok := obj.(*v1alpha2.Link) + if !ok { + log.Errorf("object is not a Link: %+v", obj) + return + } + if link.GetName() == linkName { + select { + case results <- link: + default: + log.Errorf("Link update dropped (queue full): %s", link.GetName()) + } + } + }, + UpdateFunc: func(oldObj, currentObj interface{}) { + oldLink, ok := oldObj.(*v1alpha2.Link) + if !ok { + log.Errorf("object is not a Link: %+v", oldObj) + return + } + currentLink, ok := currentObj.(*v1alpha2.Link) + if !ok { + log.Errorf("object is not a Link: %+v", currentObj) + return + } + if reflect.DeepEqual(oldLink.Spec, currentLink.Spec) { + log.Debugf("Link update ignored (only status changed): %s", currentLink.GetName()) + return + } + if currentLink.GetName() == linkName { + select { + case results <- currentLink: + default: + log.Errorf("Link update dropped (queue full): %s", currentLink.GetName()) + } + } + }, + DeleteFunc: func(obj interface{}) { + link, ok := obj.(*v1alpha2.Link) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + log.Errorf("couldn't get object from DeletedFinalStateUnknown %#v", obj) + return + } + link, ok = tombstone.Obj.(*v1alpha2.Link) + if !ok { + log.Errorf("DeletedFinalStateUnknown contained object that is not a Link %#v", obj) + return + } + } + if link.GetName() == linkName { + select { + case results <- nil: // nil indicates the link was deleted + default: + log.Errorf("Link delete dropped (queue full): %s", link.GetName()) + } + } + }, + } +} diff --git a/multicluster/service-mirror/link_handlers_test.go b/multicluster/service-mirror/link_handlers_test.go new file mode 100644 index 0000000000000..61eef5e33c5cc --- /dev/null +++ b/multicluster/service-mirror/link_handlers_test.go @@ -0,0 +1,141 @@ +package servicemirror + +import ( + "context" + "encoding/json" + "log" + "testing" + "time" + + "github.com/linkerd/linkerd2/controller/gen/apis/link/v1alpha2" + l5dcrdinformer "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions" + "github.com/linkerd/linkerd2/controller/k8s" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +const nsName = "ns1" +const linkName = "linkName" + +func TestLinkHandlers(t *testing.T) { + k8sAPI, l5dAPI, err := k8s.NewFakeAPIWithL5dClient() + if err != nil { + t.Fatal(err) + } + k8sAPI.Sync(nil) + + informerFactory := l5dcrdinformer.NewSharedInformerFactoryWithOptions( + l5dAPI, + k8s.ResyncTime, + l5dcrdinformer.WithNamespace(nsName), + ) + informer := informerFactory.Link().V1alpha2().Links().Informer() + informerFactory.Start(context.Background().Done()) + + results := make(chan *v1alpha2.Link, 100) + _, err = informer.AddEventHandler(GetLinkHandlers(results, linkName)) + if err != nil { + t.Fatal(err) + } + + // test that a message is received when a link is created + _, err = k8sAPI.Client.CoreV1().Namespaces().Create(context.Background(), &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: nsName}}, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + + link := &v1alpha2.Link{ + ObjectMeta: metav1.ObjectMeta{ + Name: linkName, + Namespace: nsName, + }, + Spec: v1alpha2.LinkSpec{ProbeSpec: v1alpha2.ProbeSpec{Timeout: "30s"}}, + } + _, err = l5dAPI.LinkV1alpha2().Links(nsName).Create(context.Background(), link, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + + select { + case link := <-results: + if link.GetName() != linkName { + t.Fatalf("Expected LinkName, got %s", link.GetName()) + } + case <-time.After(time.Second): + t.Fatal("Timed out waiting for message") + } + + // test that a message is received when a link spec is updated + patch := map[string]any{ + "spec": map[string]any{ + "probeSpec": map[string]any{ + "timeout": "60s", + }, + }, + } + patchBytes, err := json.Marshal(patch) + if err != nil { + log.Fatalf("Failed to marshal patch: %v", err) + } + _, err = l5dAPI.LinkV1alpha2().Links(nsName).Patch( + context.Background(), + linkName, + types.MergePatchType, + patchBytes, + metav1.PatchOptions{}, + ) + if err != nil { + t.Fatalf("Failed to patch link: %s", err) + } + + select { + case link := <-results: + if link.GetName() != linkName { + t.Fatalf("Expected LinkName, got %s", link.GetName()) + } + case <-time.After(time.Second): + t.Fatal("Timed out waiting for message") + } + + // test that a message is _not_ received when a link status is updated + patch = map[string]any{ + "status": map[string]any{ + "foo": "bar", + }, + } + patchBytes, err = json.Marshal(patch) + if err != nil { + log.Fatalf("Failed to marshal patch: %v", err) + } + _, err = l5dAPI.LinkV1alpha2().Links(nsName).Patch( + context.Background(), + linkName, + types.MergePatchType, + patchBytes, + metav1.PatchOptions{}, + "status", + ) + if err != nil { + t.Fatalf("Failed to patch link: %s", err) + } + + select { + case link := <-results: + t.Fatalf("Received unexpected message: %v", link) + case <-time.After(time.Second): + } + + // test that a nil message is received when a link is deleted + if err := l5dAPI.LinkV1alpha2().Links(nsName).Delete(context.Background(), linkName, metav1.DeleteOptions{}); err != nil { + t.Fatalf("Failed to delete link: %s", err) + } + select { + case link := <-results: + if link != nil { + t.Fatalf("Expected nil, got %v", link) + } + case <-time.After(time.Second): + t.Fatal("Timed out waiting for message") + } +} diff --git a/pkg/k8s/k8s.go b/pkg/k8s/k8s.go index 8f4dafb68a5ca..492f2a2dcea3c 100644 --- a/pkg/k8s/k8s.go +++ b/pkg/k8s/k8s.go @@ -21,6 +21,7 @@ const ( EndpointSlices = "endpointslices" ExtWorkload = "externalworkload" Job = "job" + Link = "link" MeshTLSAuthentication = "meshtlsauthentication" MutatingWebhookConfig = "mutatingwebhookconfig" Namespace = "namespace"