From b8e5ece76b0529a4fb667f9ecdd7f10c9bede221 Mon Sep 17 00:00:00 2001 From: jukie <10012479+Jukie@users.noreply.github.com> Date: Tue, 27 Feb 2024 20:42:18 -0700 Subject: [PATCH 01/16] Use EndpointSlice and propagate zone metadata to consul service --- control-plane/catalog/to-consul/resource.go | 258 ++++++++++-------- .../catalog/to-consul/resource_test.go | 132 +++++---- 2 files changed, 217 insertions(+), 173 deletions(-) diff --git a/control-plane/catalog/to-consul/resource.go b/control-plane/catalog/to-consul/resource.go index 2d29d6c15a..bf4d729877 100644 --- a/control-plane/catalog/to-consul/resource.go +++ b/control-plane/catalog/to-consul/resource.go @@ -17,6 +17,7 @@ import ( consulapi "github.com/hashicorp/consul/api" "github.com/hashicorp/go-hclog" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" networkingv1 "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -33,10 +34,11 @@ const ( // ConsulK8SNS is the key used in the meta to record the namespace // of the service/node registration. - ConsulK8SNS = "external-k8s-ns" - ConsulK8SRefKind = "external-k8s-ref-kind" - ConsulK8SRefValue = "external-k8s-ref-name" - ConsulK8SNodeName = "external-k8s-node-name" + ConsulK8SNS = "external-k8s-ns" + ConsulK8SRefKind = "external-k8s-ref-kind" + ConsulK8SRefValue = "external-k8s-ref-name" + ConsulK8SNodeName = "external-k8s-node-name" + ConsulK8STopologyZone = "external-k8s-topology-zone" // consulKubernetesCheckType is the type of health check in Consul for Kubernetes readiness status. consulKubernetesCheckType = "kubernetes-readiness" @@ -143,9 +145,9 @@ type ServiceResource struct { // in the form /. serviceMap map[string]*corev1.Service - // endpointsMap uses the same keys as serviceMap but maps to the endpoints + // endpointSliceListMap uses the same keys as serviceMap but maps to the EndpointSliceList // of each service. - endpointsMap map[string]*corev1.Endpoints + endpointSliceListMap map[string]*discoveryv1.EndpointSliceList // EnableIngress enables syncing of the hostname from an Ingress resource // to the service registration if an Ingress rule matches the service. @@ -227,19 +229,23 @@ func (t *ServiceResource) Upsert(key string, raw interface{}) error { // If we care about endpoints, we should do the initial endpoints load. if t.shouldTrackEndpoints(key) { - endpoints, err := t.Client.CoreV1(). - Endpoints(service.Namespace). - Get(t.Ctx, service.Name, metav1.GetOptions{}) + + // account for continue + endpointSliceList, err := t.Client.DiscoveryV1(). + EndpointSlices(service.Namespace). + List(t.Ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, service.Name)}) + if err != nil { - t.Log.Warn("error loading initial endpoints", + t.Log.Warn("error loading initial endpoint slices list", "key", key, "err", err) + return err // Ensure to handle or return the error appropriately } else { - if t.endpointsMap == nil { - t.endpointsMap = make(map[string]*corev1.Endpoints) + if t.endpointSliceListMap == nil { + t.endpointSliceListMap = make(map[string]*discoveryv1.EndpointSliceList) } - t.endpointsMap[key] = endpoints - t.Log.Debug("[ServiceResource.Upsert] adding service's endpoints to endpointsMap", "key", key, "service", service, "endpoints", endpoints) + t.endpointSliceListMap[key] = endpointSliceList + t.Log.Debug("[ServiceResource.Upsert] adding service's endpoint slices to endpointSliceListMap", "key", key, "service", service, "endpointSliceList", endpointSliceList) } } @@ -265,8 +271,8 @@ func (t *ServiceResource) Delete(key string, _ interface{}) error { func (t *ServiceResource) doDelete(key string) { delete(t.serviceMap, key) t.Log.Debug("[doDelete] deleting service from serviceMap", "key", key) - delete(t.endpointsMap, key) - t.Log.Debug("[doDelete] deleting endpoints from endpointsMap", "key", key) + delete(t.endpointSliceListMap, key) + t.Log.Debug("[doDelete] deleting endpoints from endpointSliceListMap", "key", key) // If there were registrations related to this service, then // delete them and sync. if _, ok := t.consulMap[key]; ok { @@ -582,25 +588,24 @@ func (t *ServiceResource) generateRegistrations(key string) { // pods are running on. This way we don't register _every_ K8S // node as part of the service. case corev1.ServiceTypeNodePort: - if t.endpointsMap == nil { + if t.endpointSliceListMap == nil { return } - endpoints := t.endpointsMap[key] - if endpoints == nil { + endpointsList := t.endpointSliceListMap[key] + if endpointsList == nil { return } - for _, subset := range endpoints.Subsets { - for _, subsetAddr := range subset.Addresses { + for _, endpointSlice := range endpointsList.Items { + for _, endpoint := range endpointSlice.Endpoints { // Check that the node name exists // subsetAddr.NodeName is of type *string - if subsetAddr.NodeName == nil { + if endpoint.NodeName == nil { continue } - // Look up the node's ip address by getting node info - node, err := t.Client.CoreV1().Nodes().Get(t.Ctx, *subsetAddr.NodeName, metav1.GetOptions{}) + node, err := t.Client.CoreV1().Nodes().Get(t.Ctx, *endpoint.NodeName, metav1.GetOptions{}) if err != nil { t.Log.Warn("error getting node info", "error", err) continue @@ -614,37 +619,18 @@ func (t *ServiceResource) generateRegistrations(key string) { expectedType = corev1.NodeExternalIP } - // Find the ip address for the node and - // create the Consul service using it - var found bool - for _, address := range node.Status.Addresses { - if address.Type == expectedType { - found = true - r := baseNode - rs := baseService - r.Service = &rs - r.Service.ID = serviceID(r.Service.Service, subsetAddr.IP) - r.Service.Address = address.Address - - t.consulMap[key] = append(t.consulMap[key], &r) - // Only consider the first address that matches. In some cases - // there will be multiple addresses like when using AWS CNI. - // In those cases, Kubernetes will ensure eth0 is always the first - // address in the list. - // See https://github.com/kubernetes/kubernetes/blob/b559434c02f903dbcd46ee7d6c78b216d3f0aca0/staging/src/k8s.io/legacy-cloud-providers/aws/aws.go#L1462-L1464 - break - } - } + for _, endpointAddr := range endpoint.Addresses { - // If an ExternalIP wasn't found, and ExternalFirst is set, - // use an InternalIP - if t.NodePortSync == ExternalFirst && !found { + // Find the ip address for the node and + // create the Consul service using it + var found bool for _, address := range node.Status.Addresses { - if address.Type == corev1.NodeInternalIP { + if address.Type == expectedType { + found = true r := baseNode rs := baseService r.Service = &rs - r.Service.ID = serviceID(r.Service.Service, subsetAddr.IP) + r.Service.ID = serviceID(r.Service.Service, endpointAddr) r.Service.Address = address.Address t.consulMap[key] = append(t.consulMap[key], &r) @@ -656,6 +642,29 @@ func (t *ServiceResource) generateRegistrations(key string) { break } } + + // If an ExternalIP wasn't found, and ExternalFirst is set, + // use an InternalIP + if t.NodePortSync == ExternalFirst && !found { + for _, address := range node.Status.Addresses { + if address.Type == corev1.NodeInternalIP { + r := baseNode + rs := baseService + r.Service = &rs + r.Service.ID = serviceID(r.Service.Service, endpointAddr) + r.Service.Address = address.Address + + t.consulMap[key] = append(t.consulMap[key], &r) + // Only consider the first address that matches. In some cases + // there will be multiple addresses like when using AWS CNI. + // In those cases, Kubernetes will ensure eth0 is always the first + // address in the list. + // See https://github.com/kubernetes/kubernetes/blob/b559434c02f903dbcd46ee7d6c78b216d3f0aca0/staging/src/k8s.io/legacy-cloud-providers/aws/aws.go#L1462-L1464 + break + } + } + } + } } } @@ -675,94 +684,100 @@ func (t *ServiceResource) registerServiceInstance( overridePortNumber int, useHostname bool) { - if t.endpointsMap == nil { + if t.endpointSliceListMap == nil { return } - endpoints := t.endpointsMap[key] - if endpoints == nil { + endpointSliceList := t.endpointSliceListMap[key] + if endpointSliceList == nil { return } seen := map[string]struct{}{} - for _, subset := range endpoints.Subsets { + for _, endpointSlices := range endpointSliceList.Items { // For ClusterIP services and if LoadBalancerEndpointsSync is true, we use the endpoint port instead // of the service port because we're registering each endpoint // as a separate service instance. epPort := baseService.Port if overridePortName != "" { // If we're supposed to use a specific named port, find it. - for _, p := range subset.Ports { - if overridePortName == p.Name { - epPort = int(p.Port) + for _, p := range endpointSlices.Ports { + if overridePortName == *p.Name { + epPort = int(*p.Port) break } } } else if overridePortNumber == 0 { // Otherwise we'll just use the first port in the list // (unless the port number was overridden by an annotation). - for _, p := range subset.Ports { - epPort = int(p.Port) + for _, p := range endpointSlices.Ports { + epPort = int(*p.Port) break } } - for _, subsetAddr := range subset.Addresses { - var addr string - // Use the address and port from the Ingress resource if - // ingress-sync is enabled and the service has an ingress - // resource that references it. - if t.EnableIngress && t.isIngressService(key) { - addr = t.serviceHostnameMap[key].hostName - epPort = int(t.serviceHostnameMap[key].port) - } else { - addr = subsetAddr.IP - if addr == "" && useHostname { - addr = subsetAddr.Hostname + for _, endpoint := range endpointSlices.Endpoints { + for _, endpointAddr := range endpoint.Addresses { + + var addr string + // Use the address and port from the Ingress resource if + // ingress-sync is enabled and the service has an ingress + // resource that references it. + if t.EnableIngress && t.isIngressService(key) { + addr = t.serviceHostnameMap[key].hostName + epPort = int(t.serviceHostnameMap[key].port) + } else { + addr = endpointAddr + if addr == "" && useHostname { + addr = *endpoint.Hostname + } + if addr == "" { + continue + } } - if addr == "" { + + // Its not clear whether K8S guarantees ready addresses to + // be unique so we maintain a set to prevent duplicates just + // in case. + if _, ok := seen[addr]; ok { continue } - } + seen[addr] = struct{}{} - // Its not clear whether K8S guarantees ready addresses to - // be unique so we maintain a set to prevent duplicates just - // in case. - if _, ok := seen[addr]; ok { - continue - } - seen[addr] = struct{}{} + r := baseNode + rs := baseService + r.Service = &rs + r.Service.ID = serviceID(r.Service.Service, addr) + r.Service.Address = addr + r.Service.Port = epPort + r.Service.Meta = make(map[string]string) + // Deepcopy baseService.Meta into r.Service.Meta as baseService is shared + // between all nodes of a service + for k, v := range baseService.Meta { + r.Service.Meta[k] = v + } + if endpoint.TargetRef != nil { + r.Service.Meta[ConsulK8SRefValue] = endpoint.TargetRef.Name + r.Service.Meta[ConsulK8SRefKind] = endpoint.TargetRef.Kind + } + if endpoint.NodeName != nil { + r.Service.Meta[ConsulK8SNodeName] = *endpoint.NodeName + } + if endpoint.Zone != nil { + r.Service.Meta[ConsulK8STopologyZone] = *endpoint.Zone + } - r := baseNode - rs := baseService - r.Service = &rs - r.Service.ID = serviceID(r.Service.Service, addr) - r.Service.Address = addr - r.Service.Port = epPort - r.Service.Meta = make(map[string]string) - // Deepcopy baseService.Meta into r.Service.Meta as baseService is shared - // between all nodes of a service - for k, v := range baseService.Meta { - r.Service.Meta[k] = v - } - if subsetAddr.TargetRef != nil { - r.Service.Meta[ConsulK8SRefValue] = subsetAddr.TargetRef.Name - r.Service.Meta[ConsulK8SRefKind] = subsetAddr.TargetRef.Kind - } - if subsetAddr.NodeName != nil { - r.Service.Meta[ConsulK8SNodeName] = *subsetAddr.NodeName - } + r.Check = &consulapi.AgentCheck{ + CheckID: consulHealthCheckID(endpointSlices.Namespace, serviceID(r.Service.Service, addr)), + Name: consulKubernetesCheckName, + Namespace: baseService.Namespace, + Type: consulKubernetesCheckType, + Status: consulapi.HealthPassing, + ServiceID: serviceID(r.Service.Service, addr), + Output: kubernetesSuccessReasonMsg, + } - r.Check = &consulapi.AgentCheck{ - CheckID: consulHealthCheckID(endpoints.Namespace, serviceID(r.Service.Service, addr)), - Name: consulKubernetesCheckName, - Namespace: baseService.Namespace, - Type: consulKubernetesCheckType, - Status: consulapi.HealthPassing, - ServiceID: serviceID(r.Service.Service, addr), - Output: kubernetesSuccessReasonMsg, + t.consulMap[key] = append(t.consulMap[key], &r) } - - t.consulMap[key] = append(t.consulMap[key], &r) } } } @@ -811,18 +826,19 @@ func (t *serviceEndpointsResource) Informer() cache.SharedIndexInformer { return cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return t.Service.Client.CoreV1(). - Endpoints(metav1.NamespaceAll). + + return t.Service.Client.DiscoveryV1(). + EndpointSlices(metav1.NamespaceAll). List(t.Ctx, options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return t.Service.Client.CoreV1(). - Endpoints(metav1.NamespaceAll). + return t.Service.Client.DiscoveryV1(). + EndpointSlices(metav1.NamespaceAll). Watch(t.Ctx, options) }, }, - &corev1.Endpoints{}, + &discoveryv1.EndpointSliceList{}, 0, cache.Indexers{}, ) @@ -830,10 +846,10 @@ func (t *serviceEndpointsResource) Informer() cache.SharedIndexInformer { func (t *serviceEndpointsResource) Upsert(key string, raw interface{}) error { svc := t.Service - endpoints, ok := raw.(*corev1.Endpoints) + + endpointSliceList, ok := raw.(*discoveryv1.EndpointSliceList) if !ok { svc.Log.Warn("upsert got invalid type", "raw", raw) - return nil } svc.serviceLock.Lock() @@ -845,10 +861,10 @@ func (t *serviceEndpointsResource) Upsert(key string, raw interface{}) error { } // We are tracking this service so let's keep track of the endpoints - if svc.endpointsMap == nil { - svc.endpointsMap = make(map[string]*corev1.Endpoints) + if svc.endpointSliceListMap == nil { + svc.endpointSliceListMap = make(map[string]*discoveryv1.EndpointSliceList) } - svc.endpointsMap[key] = endpoints + svc.endpointSliceListMap[key] = endpointSliceList // Update the registration and trigger a sync svc.generateRegistrations(key) @@ -864,8 +880,8 @@ func (t *serviceEndpointsResource) Delete(key string, _ interface{}) error { // This is a bit of an optimization. We only want to force a resync // if we were tracking this endpoint to begin with and that endpoint // had associated registrations. - if _, ok := t.Service.endpointsMap[key]; ok { - delete(t.Service.endpointsMap, key) + if _, ok := t.Service.endpointSliceListMap[key]; ok { + delete(t.Service.endpointSliceListMap, key) if _, ok := t.Service.consulMap[key]; ok { delete(t.Service.consulMap, key) t.Service.sync() diff --git a/control-plane/catalog/to-consul/resource_test.go b/control-plane/catalog/to-consul/resource_test.go index 3b8fb78497..0c72b3844c 100644 --- a/control-plane/catalog/to-consul/resource_test.go +++ b/control-plane/catalog/to-consul/resource_test.go @@ -14,11 +14,13 @@ import ( "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" networkingv1 "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" + "k8s.io/utils/pointer" ) const nodeName1 = "ip-10-11-12-13.ec2.internal" @@ -759,27 +761,40 @@ func TestServiceResource_lbRegisterEndpoints(t *testing.T) { node1, _ := createNodes(t, client) - // Insert the endpoints - _, err := client.CoreV1().Endpoints(metav1.NamespaceDefault).Create( + // Insert the endpoint slices + _, err := client.DiscoveryV1().EndpointSlices(metav1.NamespaceDefault).Create( context.Background(), - &corev1.Endpoints{ + &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", }, - - Subsets: []corev1.EndpointSubset{ + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{ { - Addresses: []corev1.EndpointAddress{ - {NodeName: &node1.Name, IP: "8.8.8.8"}, - }, - Ports: []corev1.EndpointPort{ - {Name: "http", Port: 8080}, - {Name: "rpc", Port: 2000}, + Addresses: []string{"8.8.8.8"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: pointer.Bool(true), + Serving: pointer.Bool(true), + Terminating: pointer.Bool(false), }, + TargetRef: &corev1.ObjectReference{Kind: "pod", Name: "foo", Namespace: metav1.NamespaceDefault}, + NodeName: &node1.Name, + Zone: pointer.String("us-west-2a"), + }, + }, + Ports: []discoveryv1.EndpointPort{ + { + Name: pointer.String("http"), + Port: pointer.Int32(8080), + }, + { + Name: pointer.String("rpc"), + Port: pointer.Int32(2000), }, }, }, - metav1.CreateOptions{}) + metav1.CreateOptions{}, + ) require.NoError(t, err) // Insert an LB service @@ -814,7 +829,7 @@ func TestServiceResource_nodePort(t *testing.T) { createNodes(t, client) - createEndpoints(t, client, "foo", metav1.NamespaceDefault) + createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Insert the service svc := nodePortService("foo", metav1.NamespaceDefault) @@ -854,7 +869,7 @@ func TestServiceResource_nodePortPrefix(t *testing.T) { createNodes(t, client) - createEndpoints(t, client, "foo", metav1.NamespaceDefault) + createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Insert the service svc := nodePortService("foo", metav1.NamespaceDefault) @@ -949,7 +964,7 @@ func TestServiceResource_nodePortAnnotatedPort(t *testing.T) { createNodes(t, client) - createEndpoints(t, client, "foo", metav1.NamespaceDefault) + createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Insert the service svc := nodePortService("foo", metav1.NamespaceDefault) @@ -989,7 +1004,7 @@ func TestServiceResource_nodePortUnnamedPort(t *testing.T) { createNodes(t, client) - createEndpoints(t, client, "foo", metav1.NamespaceDefault) + createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Insert the service svc := nodePortService("foo", metav1.NamespaceDefault) @@ -1034,7 +1049,7 @@ func TestServiceResource_nodePort_internalOnlySync(t *testing.T) { createNodes(t, client) - createEndpoints(t, client, "foo", metav1.NamespaceDefault) + createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Insert the service svc := nodePortService("foo", metav1.NamespaceDefault) @@ -1082,7 +1097,7 @@ func TestServiceResource_nodePort_externalFirstSync(t *testing.T) { _, err := client.CoreV1().Nodes().UpdateStatus(context.Background(), node1, metav1.UpdateOptions{}) require.NoError(t, err) - createEndpoints(t, client, "foo", metav1.NamespaceDefault) + createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Insert the service svc := nodePortService("foo", metav1.NamespaceDefault) @@ -1125,7 +1140,7 @@ func TestServiceResource_clusterIP(t *testing.T) { require.NoError(t, err) // Insert the endpoints - createEndpoints(t, client, "foo", metav1.NamespaceDefault) + createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Verify what we got retry.Run(t, func(r *retry.R) { @@ -1161,7 +1176,7 @@ func TestServiceResource_clusterIP_healthCheck(t *testing.T) { require.NoError(t, err) // Insert the endpoints - createEndpoints(t, client, "foo", metav1.NamespaceDefault) + createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Verify what we got retry.Run(t, func(r *retry.R) { @@ -1199,7 +1214,7 @@ func TestServiceResource_clusterIPPrefix(t *testing.T) { require.NoError(t, err) // Insert the endpoints - createEndpoints(t, client, "foo", metav1.NamespaceDefault) + createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Verify what we got retry.Run(t, func(r *retry.R) { @@ -1237,7 +1252,7 @@ func TestServiceResource_clusterIPAnnotatedPortName(t *testing.T) { require.NoError(t, err) // Insert the endpoints - createEndpoints(t, client, "foo", metav1.NamespaceDefault) + createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Verify what we got retry.Run(t, func(r *retry.R) { @@ -1275,7 +1290,7 @@ func TestServiceResource_clusterIPAnnotatedPortNumber(t *testing.T) { require.NoError(t, err) // Insert the endpoints - createEndpoints(t, client, "foo", metav1.NamespaceDefault) + createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Verify what we got retry.Run(t, func(r *retry.R) { @@ -1315,7 +1330,7 @@ func TestServiceResource_clusterIPUnnamedPorts(t *testing.T) { require.NoError(t, err) // Insert the endpoints - createEndpoints(t, client, "foo", metav1.NamespaceDefault) + createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Verify what we got retry.Run(t, func(r *retry.R) { @@ -1352,7 +1367,7 @@ func TestServiceResource_clusterIPSyncDisabled(t *testing.T) { require.NoError(t, err) // Insert the endpoints - createEndpoints(t, client, "foo", metav1.NamespaceDefault) + createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Verify what we got retry.Run(t, func(r *retry.R) { @@ -1382,7 +1397,7 @@ func TestServiceResource_clusterIPAllNamespaces(t *testing.T) { require.NoError(t, err) // Insert the endpoints - createEndpoints(t, client, "foo", testNamespace) + createEndpointSlice(t, client, "foo", testNamespace) // Verify what we got retry.Run(t, func(r *retry.R) { @@ -1423,7 +1438,7 @@ func TestServiceResource_clusterIPTargetPortNamed(t *testing.T) { require.NoError(t, err) // Insert the endpoints - createEndpoints(t, client, "foo", metav1.NamespaceDefault) + createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Verify what we got retry.Run(t, func(r *retry.R) { @@ -1459,7 +1474,7 @@ func TestServiceResource_targetRefInMeta(t *testing.T) { require.NoError(t, err) // Insert the endpoints - createEndpoints(t, client, "foo", metav1.NamespaceDefault) + createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Verify what we got retry.Run(t, func(r *retry.R) { @@ -1945,7 +1960,7 @@ func TestServiceResource_addIngress(t *testing.T) { // Create the ingress _, err = client.NetworkingV1().Ingresses(metav1.NamespaceDefault).Create(context.Background(), test.ingress, metav1.CreateOptions{}) require.NoError(t, err) - createEndpoints(t, client, "test-service", metav1.NamespaceDefault) + createEndpointSlice(t, client, "test-service", metav1.NamespaceDefault) // Verify that the service name annotation is preferred retry.Run(t, func(r *retry.R) { syncer.Lock() @@ -2066,43 +2081,56 @@ func createNodes(t *testing.T, client *fake.Clientset) (*corev1.Node, *corev1.No return node1, node2 } -// createEndpoints calls the fake k8s client to create two endpoints on two nodes. -func createEndpoints(t *testing.T, client *fake.Clientset, serviceName string, namespace string) { +// createEndpoints calls the fake k8s client to create two endpoint slices across two nodes. +func createEndpointSlice(t *testing.T, client *fake.Clientset, serviceName string, namespace string) { node1 := nodeName1 node2 := nodeName2 targetRef := corev1.ObjectReference{Kind: "pod", Name: "foobar"} - _, err := client.CoreV1().Endpoints(namespace).Create( + + _, err := client.DiscoveryV1().EndpointSlices(metav1.NamespaceDefault).Create( context.Background(), - &corev1.Endpoints{ + &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ - Name: serviceName, - Namespace: namespace, + GenerateName: serviceName + "-", + Namespace: namespace, }, - - Subsets: []corev1.EndpointSubset{ + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{ { - Addresses: []corev1.EndpointAddress{ - {NodeName: &node1, IP: "1.1.1.1", TargetRef: &targetRef}, - }, - Ports: []corev1.EndpointPort{ - {Name: "http", Port: 8080}, - {Name: "rpc", Port: 2000}, + Addresses: []string{"1.1.1.1"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: pointer.Bool(true), + Serving: pointer.Bool(true), + Terminating: pointer.Bool(false), }, + TargetRef: &targetRef, + NodeName: &node1, + Zone: pointer.String("us-west-2a"), }, - { - Addresses: []corev1.EndpointAddress{ - {NodeName: &node2, IP: "2.2.2.2"}, - }, - Ports: []corev1.EndpointPort{ - {Name: "http", Port: 8080}, - {Name: "rpc", Port: 2000}, + Addresses: []string{"2.2.2.2"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: pointer.Bool(true), + Serving: pointer.Bool(true), + Terminating: pointer.Bool(false), }, + NodeName: &node2, + Zone: pointer.String("us-west-2b"), + }, + }, + Ports: []discoveryv1.EndpointPort{ + { + Name: pointer.String("http"), + Port: pointer.Int32(8080), + }, + { + Name: pointer.String("rpc"), + Port: pointer.Int32(2000), }, }, }, - metav1.CreateOptions{}) - + metav1.CreateOptions{}, + ) require.NoError(t, err) } From ee26768ad44aa5db0662c3d133de6c5eca4cae14 Mon Sep 17 00:00:00 2001 From: jukie <10012479+Jukie@users.noreply.github.com> Date: Wed, 28 Feb 2024 01:44:21 -0700 Subject: [PATCH 02/16] Fix tests --- control-plane/catalog/to-consul/resource.go | 35 ++++++------- .../catalog/to-consul/resource_test.go | 51 ++++++++++++------- 2 files changed, 52 insertions(+), 34 deletions(-) diff --git a/control-plane/catalog/to-consul/resource.go b/control-plane/catalog/to-consul/resource.go index bf4d729877..ea3224f2b7 100644 --- a/control-plane/catalog/to-consul/resource.go +++ b/control-plane/catalog/to-consul/resource.go @@ -147,7 +147,7 @@ type ServiceResource struct { // endpointSliceListMap uses the same keys as serviceMap but maps to the EndpointSliceList // of each service. - endpointSliceListMap map[string]*discoveryv1.EndpointSliceList + endpointSliceListMap map[string][]discoveryv1.EndpointSlice // EnableIngress enables syncing of the hostname from an Ingress resource // to the service registration if an Ingress rule matches the service. @@ -201,7 +201,7 @@ func (t *ServiceResource) Upsert(key string, raw interface{}) error { // We expect a Service. If it isn't a service then just ignore it. service, ok := raw.(*corev1.Service) if !ok { - t.Log.Warn("upsert got invalid type", "raw", raw) + t.Log.Warn("upsert got invalid type svc", "raw", raw) return nil } @@ -242,9 +242,9 @@ func (t *ServiceResource) Upsert(key string, raw interface{}) error { return err // Ensure to handle or return the error appropriately } else { if t.endpointSliceListMap == nil { - t.endpointSliceListMap = make(map[string]*discoveryv1.EndpointSliceList) + t.endpointSliceListMap = make(map[string][]discoveryv1.EndpointSlice) } - t.endpointSliceListMap[key] = endpointSliceList + t.endpointSliceListMap[key] = endpointSliceList.Items t.Log.Debug("[ServiceResource.Upsert] adding service's endpoint slices to endpointSliceListMap", "key", key, "service", service, "endpointSliceList", endpointSliceList) } } @@ -592,12 +592,12 @@ func (t *ServiceResource) generateRegistrations(key string) { return } - endpointsList := t.endpointSliceListMap[key] - if endpointsList == nil { + endpointSliceList := t.endpointSliceListMap[key] + if endpointSliceList == nil { return } - for _, endpointSlice := range endpointsList.Items { + for _, endpointSlice := range endpointSliceList { for _, endpoint := range endpointSlice.Endpoints { // Check that the node name exists // subsetAddr.NodeName is of type *string @@ -694,14 +694,14 @@ func (t *ServiceResource) registerServiceInstance( } seen := map[string]struct{}{} - for _, endpointSlices := range endpointSliceList.Items { + for _, endpointSlice := range endpointSliceList { // For ClusterIP services and if LoadBalancerEndpointsSync is true, we use the endpoint port instead // of the service port because we're registering each endpoint // as a separate service instance. epPort := baseService.Port if overridePortName != "" { // If we're supposed to use a specific named port, find it. - for _, p := range endpointSlices.Ports { + for _, p := range endpointSlice.Ports { if overridePortName == *p.Name { epPort = int(*p.Port) break @@ -710,12 +710,12 @@ func (t *ServiceResource) registerServiceInstance( } else if overridePortNumber == 0 { // Otherwise we'll just use the first port in the list // (unless the port number was overridden by an annotation). - for _, p := range endpointSlices.Ports { + for _, p := range endpointSlice.Ports { epPort = int(*p.Port) break } } - for _, endpoint := range endpointSlices.Endpoints { + for _, endpoint := range endpointSlice.Endpoints { for _, endpointAddr := range endpoint.Addresses { var addr string @@ -767,7 +767,7 @@ func (t *ServiceResource) registerServiceInstance( } r.Check = &consulapi.AgentCheck{ - CheckID: consulHealthCheckID(endpointSlices.Namespace, serviceID(r.Service.Service, addr)), + CheckID: consulHealthCheckID(endpointSlice.Namespace, serviceID(r.Service.Service, addr)), Name: consulKubernetesCheckName, Namespace: baseService.Namespace, Type: consulKubernetesCheckType, @@ -847,9 +847,9 @@ func (t *serviceEndpointsResource) Informer() cache.SharedIndexInformer { func (t *serviceEndpointsResource) Upsert(key string, raw interface{}) error { svc := t.Service - endpointSliceList, ok := raw.(*discoveryv1.EndpointSliceList) + endpointSlice, ok := raw.(*discoveryv1.EndpointSlice) if !ok { - svc.Log.Warn("upsert got invalid type", "raw", raw) + svc.Log.Warn("upsert got invalid type for ep", "raw", raw) } svc.serviceLock.Lock() @@ -862,9 +862,10 @@ func (t *serviceEndpointsResource) Upsert(key string, raw interface{}) error { // We are tracking this service so let's keep track of the endpoints if svc.endpointSliceListMap == nil { - svc.endpointSliceListMap = make(map[string]*discoveryv1.EndpointSliceList) + svc.endpointSliceListMap = make(map[string][]discoveryv1.EndpointSlice) } - svc.endpointSliceListMap[key] = endpointSliceList + //svc.endpointSliceListMap[key] = &discoveryv1.EndpointSliceList{Items: []discoveryv1.EndpointSlice{*endpointSliceList}} + svc.endpointSliceListMap[key] = []discoveryv1.EndpointSlice{*endpointSlice} // Update the registration and trigger a sync svc.generateRegistrations(key) @@ -931,7 +932,7 @@ func (t *serviceIngressResource) Upsert(key string, raw interface{}) error { svc := t.Service ingress, ok := raw.(*networkingv1.Ingress) if !ok { - svc.Log.Warn("upsert got invalid type", "raw", raw) + svc.Log.Warn("upsert got invalid type ing", "raw", raw) return nil } diff --git a/control-plane/catalog/to-consul/resource_test.go b/control-plane/catalog/to-consul/resource_test.go index 0c72b3844c..21a3ac1655 100644 --- a/control-plane/catalog/to-consul/resource_test.go +++ b/control-plane/catalog/to-consul/resource_test.go @@ -766,7 +766,8 @@ func TestServiceResource_lbRegisterEndpoints(t *testing.T) { context.Background(), &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ - Name: "foo", + GenerateName: "foo-", + Labels: map[string]string{discoveryv1.LabelServiceName: "foo"}, }, AddressType: discoveryv1.AddressTypeIPv4, Endpoints: []discoveryv1.Endpoint{ @@ -909,27 +910,43 @@ func TestServiceResource_nodePort_singleEndpoint(t *testing.T) { node1, _ := createNodes(t, client) - // Insert the endpoints - _, err := client.CoreV1().Endpoints(metav1.NamespaceDefault).Create( + // Insert the endpoint slice + + _, err := client.DiscoveryV1().EndpointSlices(metav1.NamespaceDefault).Create( context.Background(), - &corev1.Endpoints{ + &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ - Name: "foo", + GenerateName: "foo-", + Labels: map[string]string{discoveryv1.LabelServiceName: "foo"}, }, - - Subsets: []corev1.EndpointSubset{ + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{ { - Addresses: []corev1.EndpointAddress{ - {NodeName: &node1.Name, IP: "1.2.3.4"}, - }, - Ports: []corev1.EndpointPort{ - {Name: "http", Port: 8080}, - {Name: "rpc", Port: 2000}, + Addresses: []string{"1.2.3.4"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: pointer.Bool(true), + Serving: pointer.Bool(true), + Terminating: pointer.Bool(false), }, + TargetRef: &corev1.ObjectReference{Kind: "pod", Name: "foo", Namespace: metav1.NamespaceDefault}, + NodeName: &node1.Name, + Zone: pointer.String("us-west-2a"), + }, + }, + Ports: []discoveryv1.EndpointPort{ + { + Name: pointer.String("http"), + Port: pointer.Int32(8080), + }, + { + Name: pointer.String("rpc"), + Port: pointer.Int32(2000), }, }, }, - metav1.CreateOptions{}) + metav1.CreateOptions{}, + ) + require.NoError(t, err) // Insert the service @@ -2081,18 +2098,18 @@ func createNodes(t *testing.T, client *fake.Clientset) (*corev1.Node, *corev1.No return node1, node2 } -// createEndpoints calls the fake k8s client to create two endpoint slices across two nodes. +// createEndpointSlices calls the fake k8s client to create an endpoint slices with two endpoints on different nodes. func createEndpointSlice(t *testing.T, client *fake.Clientset, serviceName string, namespace string) { node1 := nodeName1 node2 := nodeName2 targetRef := corev1.ObjectReference{Kind: "pod", Name: "foobar"} - _, err := client.DiscoveryV1().EndpointSlices(metav1.NamespaceDefault).Create( + _, err := client.DiscoveryV1().EndpointSlices(namespace).Create( context.Background(), &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ GenerateName: serviceName + "-", - Namespace: namespace, + Labels: map[string]string{discoveryv1.LabelServiceName: serviceName}, }, AddressType: discoveryv1.AddressTypeIPv4, Endpoints: []discoveryv1.Endpoint{ From 4a3c20906afad3ee42d9b17972e0bf62351081a9 Mon Sep 17 00:00:00 2001 From: jukie <10012479+Jukie@users.noreply.github.com> Date: Wed, 28 Feb 2024 02:31:14 -0700 Subject: [PATCH 03/16] Add test for zone metadata --- control-plane/catalog/to-consul/resource_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/control-plane/catalog/to-consul/resource_test.go b/control-plane/catalog/to-consul/resource_test.go index 21a3ac1655..8d408686b4 100644 --- a/control-plane/catalog/to-consul/resource_test.go +++ b/control-plane/catalog/to-consul/resource_test.go @@ -1171,6 +1171,8 @@ func TestServiceResource_clusterIP(t *testing.T) { require.Equal(r, "foo", actual[1].Service.Service) require.Equal(r, "2.2.2.2", actual[1].Service.Address) require.Equal(r, 8080, actual[1].Service.Port) + require.Equal(r, "us-west-2a", actual[0].Service.Meta["external-k8s-topology-zone"]) + require.Equal(r, "us-west-2b", actual[1].Service.Meta["external-k8s-topology-zone"]) require.NotEqual(r, actual[0].Service.ID, actual[1].Service.ID) }) } From c11b0343a90beefa170e5f637e2f9d5e6a9691a1 Mon Sep 17 00:00:00 2001 From: jukie <10012479+Jukie@users.noreply.github.com> Date: Wed, 28 Feb 2024 08:26:03 -0700 Subject: [PATCH 04/16] Cleanup and changelog entry --- .changelog/3693.txt | 3 + control-plane/catalog/to-consul/resource.go | 82 +++++++++++-------- .../catalog/to-consul/resource_test.go | 33 ++++---- 3 files changed, 67 insertions(+), 51 deletions(-) create mode 100644 .changelog/3693.txt diff --git a/.changelog/3693.txt b/.changelog/3693.txt new file mode 100644 index 0000000000..ef5ef42b64 --- /dev/null +++ b/.changelog/3693.txt @@ -0,0 +1,3 @@ +```release-note:improvement +catalog: Topology Zone information is now read from the Kubernetes endpoints and added to registered consul services under Meta. +``` \ No newline at end of file diff --git a/control-plane/catalog/to-consul/resource.go b/control-plane/catalog/to-consul/resource.go index ea3224f2b7..d751033998 100644 --- a/control-plane/catalog/to-consul/resource.go +++ b/control-plane/catalog/to-consul/resource.go @@ -145,9 +145,9 @@ type ServiceResource struct { // in the form /. serviceMap map[string]*corev1.Service - // endpointSliceListMap uses the same keys as serviceMap but maps to the EndpointSliceList + // endpointSlicesMap uses the same keys as serviceMap but maps to a list of EndpointSlices // of each service. - endpointSliceListMap map[string][]discoveryv1.EndpointSlice + endpointSlicesMap map[string][]discoveryv1.EndpointSlice // EnableIngress enables syncing of the hostname from an Ingress resource // to the service registration if an Ingress rule matches the service. @@ -201,7 +201,7 @@ func (t *ServiceResource) Upsert(key string, raw interface{}) error { // We expect a Service. If it isn't a service then just ignore it. service, ok := raw.(*corev1.Service) if !ok { - t.Log.Warn("upsert got invalid type svc", "raw", raw) + t.Log.Warn("upsert got invalid type", "raw", raw) return nil } @@ -227,26 +227,45 @@ func (t *ServiceResource) Upsert(key string, raw interface{}) error { t.serviceMap[key] = service t.Log.Debug("[ServiceResource.Upsert] adding service to serviceMap", "key", key, "service", service) - // If we care about endpoints, we should do the initial endpoints load. + // If we care about endpoints, we should load the associated endpoint slices. if t.shouldTrackEndpoints(key) { - // account for continue - endpointSliceList, err := t.Client.DiscoveryV1(). - EndpointSlices(service.Namespace). - List(t.Ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, service.Name)}) + var allEndpointSlices []discoveryv1.EndpointSlice + labelSelector := fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, service.Name) + continueToken := "" + limit := int64(100) - if err != nil { - t.Log.Warn("error loading initial endpoint slices list", - "key", key, - "err", err) - return err // Ensure to handle or return the error appropriately - } else { - if t.endpointSliceListMap == nil { - t.endpointSliceListMap = make(map[string][]discoveryv1.EndpointSlice) + for { + opts := metav1.ListOptions{ + LabelSelector: labelSelector, + Limit: limit, + Continue: continueToken, + } + endpointSliceList, err := t.Client.DiscoveryV1(). + EndpointSlices(service.Namespace). + List(t.Ctx, opts) + + if err != nil { + t.Log.Warn("error loading endpoint slices list", + "key", key, + "err", err) + return err } - t.endpointSliceListMap[key] = endpointSliceList.Items - t.Log.Debug("[ServiceResource.Upsert] adding service's endpoint slices to endpointSliceListMap", "key", key, "service", service, "endpointSliceList", endpointSliceList) + + allEndpointSlices = append(allEndpointSlices, endpointSliceList.Items...) + + if endpointSliceList.Continue != "" { + continueToken = endpointSliceList.Continue + } else { + break + } + } + + if t.endpointSlicesMap == nil { + t.endpointSlicesMap = make(map[string][]discoveryv1.EndpointSlice) } + t.endpointSlicesMap[key] = allEndpointSlices + t.Log.Debug("[ServiceResource.Upsert] adding service's endpoint slices to endpointSlicesMap", "key", key, "service", service, "endpointSlices", allEndpointSlices) } // Update the registration and trigger a sync @@ -271,8 +290,8 @@ func (t *ServiceResource) Delete(key string, _ interface{}) error { func (t *ServiceResource) doDelete(key string) { delete(t.serviceMap, key) t.Log.Debug("[doDelete] deleting service from serviceMap", "key", key) - delete(t.endpointSliceListMap, key) - t.Log.Debug("[doDelete] deleting endpoints from endpointSliceListMap", "key", key) + delete(t.endpointSlicesMap, key) + t.Log.Debug("[doDelete] deleting endpoints from endpointSlicesMap", "key", key) // If there were registrations related to this service, then // delete them and sync. if _, ok := t.consulMap[key]; ok { @@ -588,11 +607,11 @@ func (t *ServiceResource) generateRegistrations(key string) { // pods are running on. This way we don't register _every_ K8S // node as part of the service. case corev1.ServiceTypeNodePort: - if t.endpointSliceListMap == nil { + if t.endpointSlicesMap == nil { return } - endpointSliceList := t.endpointSliceListMap[key] + endpointSliceList := t.endpointSlicesMap[key] if endpointSliceList == nil { return } @@ -684,11 +703,11 @@ func (t *ServiceResource) registerServiceInstance( overridePortNumber int, useHostname bool) { - if t.endpointSliceListMap == nil { + if t.endpointSlicesMap == nil { return } - endpointSliceList := t.endpointSliceListMap[key] + endpointSliceList := t.endpointSlicesMap[key] if endpointSliceList == nil { return } @@ -849,7 +868,7 @@ func (t *serviceEndpointsResource) Upsert(key string, raw interface{}) error { endpointSlice, ok := raw.(*discoveryv1.EndpointSlice) if !ok { - svc.Log.Warn("upsert got invalid type for ep", "raw", raw) + svc.Log.Warn("upsert got invalid type", "raw", raw) } svc.serviceLock.Lock() @@ -861,11 +880,10 @@ func (t *serviceEndpointsResource) Upsert(key string, raw interface{}) error { } // We are tracking this service so let's keep track of the endpoints - if svc.endpointSliceListMap == nil { - svc.endpointSliceListMap = make(map[string][]discoveryv1.EndpointSlice) + if svc.endpointSlicesMap == nil { + svc.endpointSlicesMap = make(map[string][]discoveryv1.EndpointSlice) } - //svc.endpointSliceListMap[key] = &discoveryv1.EndpointSliceList{Items: []discoveryv1.EndpointSlice{*endpointSliceList}} - svc.endpointSliceListMap[key] = []discoveryv1.EndpointSlice{*endpointSlice} + svc.endpointSlicesMap[key] = []discoveryv1.EndpointSlice{*endpointSlice} // Update the registration and trigger a sync svc.generateRegistrations(key) @@ -881,8 +899,8 @@ func (t *serviceEndpointsResource) Delete(key string, _ interface{}) error { // This is a bit of an optimization. We only want to force a resync // if we were tracking this endpoint to begin with and that endpoint // had associated registrations. - if _, ok := t.Service.endpointSliceListMap[key]; ok { - delete(t.Service.endpointSliceListMap, key) + if _, ok := t.Service.endpointSlicesMap[key]; ok { + delete(t.Service.endpointSlicesMap, key) if _, ok := t.Service.consulMap[key]; ok { delete(t.Service.consulMap, key) t.Service.sync() @@ -932,7 +950,7 @@ func (t *serviceIngressResource) Upsert(key string, raw interface{}) error { svc := t.Service ingress, ok := raw.(*networkingv1.Ingress) if !ok { - svc.Log.Warn("upsert got invalid type ing", "raw", raw) + svc.Log.Warn("upsert got invalid type", "raw", raw) return nil } diff --git a/control-plane/catalog/to-consul/resource_test.go b/control-plane/catalog/to-consul/resource_test.go index 8d408686b4..5e0f938c7a 100644 --- a/control-plane/catalog/to-consul/resource_test.go +++ b/control-plane/catalog/to-consul/resource_test.go @@ -761,7 +761,7 @@ func TestServiceResource_lbRegisterEndpoints(t *testing.T) { node1, _ := createNodes(t, client) - // Insert the endpoint slices + // Insert the endpoint slice _, err := client.DiscoveryV1().EndpointSlices(metav1.NamespaceDefault).Create( context.Background(), &discoveryv1.EndpointSlice{ @@ -794,8 +794,7 @@ func TestServiceResource_lbRegisterEndpoints(t *testing.T) { }, }, }, - metav1.CreateOptions{}, - ) + metav1.CreateOptions{}) require.NoError(t, err) // Insert an LB service @@ -911,7 +910,6 @@ func TestServiceResource_nodePort_singleEndpoint(t *testing.T) { node1, _ := createNodes(t, client) // Insert the endpoint slice - _, err := client.DiscoveryV1().EndpointSlices(metav1.NamespaceDefault).Create( context.Background(), &discoveryv1.EndpointSlice{ @@ -944,9 +942,7 @@ func TestServiceResource_nodePort_singleEndpoint(t *testing.T) { }, }, }, - metav1.CreateOptions{}, - ) - + metav1.CreateOptions{}) require.NoError(t, err) // Insert the service @@ -1156,7 +1152,7 @@ func TestServiceResource_clusterIP(t *testing.T) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) require.NoError(t, err) - // Insert the endpoints + // Insert the endpoint slice createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Verify what we got @@ -1194,7 +1190,7 @@ func TestServiceResource_clusterIP_healthCheck(t *testing.T) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) require.NoError(t, err) - // Insert the endpoints + // Insert the endpoint slice createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Verify what we got @@ -1232,7 +1228,7 @@ func TestServiceResource_clusterIPPrefix(t *testing.T) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) require.NoError(t, err) - // Insert the endpoints + // Insert the endpoint slice createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Verify what we got @@ -1270,7 +1266,7 @@ func TestServiceResource_clusterIPAnnotatedPortName(t *testing.T) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) require.NoError(t, err) - // Insert the endpoints + // Insert the endpoint slice createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Verify what we got @@ -1308,7 +1304,7 @@ func TestServiceResource_clusterIPAnnotatedPortNumber(t *testing.T) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) require.NoError(t, err) - // Insert the endpoints + // Insert the endpoint slice createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Verify what we got @@ -1348,7 +1344,7 @@ func TestServiceResource_clusterIPUnnamedPorts(t *testing.T) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) require.NoError(t, err) - // Insert the endpoints + // Insert the endpoint slice createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Verify what we got @@ -1385,7 +1381,7 @@ func TestServiceResource_clusterIPSyncDisabled(t *testing.T) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) require.NoError(t, err) - // Insert the endpoints + // Insert the endpoint slice createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Verify what we got @@ -1415,7 +1411,7 @@ func TestServiceResource_clusterIPAllNamespaces(t *testing.T) { _, err := client.CoreV1().Services(testNamespace).Create(context.Background(), svc, metav1.CreateOptions{}) require.NoError(t, err) - // Insert the endpoints + // Insert the endpoint slice createEndpointSlice(t, client, "foo", testNamespace) // Verify what we got @@ -1456,7 +1452,7 @@ func TestServiceResource_clusterIPTargetPortNamed(t *testing.T) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) require.NoError(t, err) - // Insert the endpoints + // Insert the endpoint slice createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Verify what we got @@ -1492,7 +1488,7 @@ func TestServiceResource_targetRefInMeta(t *testing.T) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) require.NoError(t, err) - // Insert the endpoints + // Insert the endpoint slice createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Verify what we got @@ -2148,8 +2144,7 @@ func createEndpointSlice(t *testing.T, client *fake.Clientset, serviceName strin }, }, }, - metav1.CreateOptions{}, - ) + metav1.CreateOptions{}) require.NoError(t, err) } From 093826a5e77895e143149bdf023b4c83eae1e8ce Mon Sep 17 00:00:00 2001 From: jukie <10012479+Jukie@users.noreply.github.com> Date: Wed, 28 Feb 2024 09:37:59 -0700 Subject: [PATCH 05/16] Fix clusterrole permissions and type on Informer --- charts/consul/templates/sync-catalog-clusterrole.yaml | 8 +++++++- control-plane/catalog/to-consul/resource.go | 3 +-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/charts/consul/templates/sync-catalog-clusterrole.yaml b/charts/consul/templates/sync-catalog-clusterrole.yaml index 585b5ad171..d2e0cac45e 100644 --- a/charts/consul/templates/sync-catalog-clusterrole.yaml +++ b/charts/consul/templates/sync-catalog-clusterrole.yaml @@ -14,7 +14,13 @@ rules: - apiGroups: [ "" ] resources: - services - - endpoints + verbs: + - get + - list + - watch +- apiGroups: ["discovery.k8s.io"] + resources: + - endpointslices verbs: - get - list diff --git a/control-plane/catalog/to-consul/resource.go b/control-plane/catalog/to-consul/resource.go index d751033998..aa337c9156 100644 --- a/control-plane/catalog/to-consul/resource.go +++ b/control-plane/catalog/to-consul/resource.go @@ -845,7 +845,6 @@ func (t *serviceEndpointsResource) Informer() cache.SharedIndexInformer { return cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return t.Service.Client.DiscoveryV1(). EndpointSlices(metav1.NamespaceAll). List(t.Ctx, options) @@ -857,7 +856,7 @@ func (t *serviceEndpointsResource) Informer() cache.SharedIndexInformer { Watch(t.Ctx, options) }, }, - &discoveryv1.EndpointSliceList{}, + &discoveryv1.EndpointSlice{}, 0, cache.Indexers{}, ) From 75810ab9e43860cdcb6c30e9a1c051fe811a0ff1 Mon Sep 17 00:00:00 2001 From: jukie <10012479+Jukie@users.noreply.github.com> Date: Wed, 28 Feb 2024 10:14:27 -0700 Subject: [PATCH 06/16] Include region info for NodePort services --- control-plane/catalog/to-consul/resource.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/control-plane/catalog/to-consul/resource.go b/control-plane/catalog/to-consul/resource.go index aa337c9156..6c62a393df 100644 --- a/control-plane/catalog/to-consul/resource.go +++ b/control-plane/catalog/to-consul/resource.go @@ -34,11 +34,12 @@ const ( // ConsulK8SNS is the key used in the meta to record the namespace // of the service/node registration. - ConsulK8SNS = "external-k8s-ns" - ConsulK8SRefKind = "external-k8s-ref-kind" - ConsulK8SRefValue = "external-k8s-ref-name" - ConsulK8SNodeName = "external-k8s-node-name" - ConsulK8STopologyZone = "external-k8s-topology-zone" + ConsulK8SNS = "external-k8s-ns" + ConsulK8SRefKind = "external-k8s-ref-kind" + ConsulK8SRefValue = "external-k8s-ref-name" + ConsulK8SNodeName = "external-k8s-node-name" + ConsulK8STopologyRegion = "external-k8s-topology-region" + ConsulK8STopologyZone = "external-k8s-topology-zone" // consulKubernetesCheckType is the type of health check in Consul for Kubernetes readiness status. consulKubernetesCheckType = "kubernetes-readiness" @@ -651,6 +652,9 @@ func (t *ServiceResource) generateRegistrations(key string) { r.Service = &rs r.Service.ID = serviceID(r.Service.Service, endpointAddr) r.Service.Address = address.Address + if region := node.Labels[corev1.LabelTopologyRegion]; region != "" { + r.Service.Meta[ConsulK8STopologyRegion] = region + } t.consulMap[key] = append(t.consulMap[key], &r) // Only consider the first address that matches. In some cases From f0f1762404863ccb2d51250802c91febe491dba5 Mon Sep 17 00:00:00 2001 From: jukie <10012479+Jukie@users.noreply.github.com> Date: Wed, 28 Feb 2024 11:27:30 -0700 Subject: [PATCH 07/16] Include topology region for all service types --- control-plane/catalog/to-consul/resource.go | 13 ++++++++++--- control-plane/catalog/to-consul/resource_test.go | 10 ++++++++++ 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/control-plane/catalog/to-consul/resource.go b/control-plane/catalog/to-consul/resource.go index 6c62a393df..9e7fe28cb9 100644 --- a/control-plane/catalog/to-consul/resource.go +++ b/control-plane/catalog/to-consul/resource.go @@ -652,9 +652,6 @@ func (t *ServiceResource) generateRegistrations(key string) { r.Service = &rs r.Service.ID = serviceID(r.Service.Service, endpointAddr) r.Service.Address = address.Address - if region := node.Labels[corev1.LabelTopologyRegion]; region != "" { - r.Service.Meta[ConsulK8STopologyRegion] = region - } t.consulMap[key] = append(t.consulMap[key], &r) // Only consider the first address that matches. In some cases @@ -739,6 +736,13 @@ func (t *ServiceResource) registerServiceInstance( } } for _, endpoint := range endpointSlice.Endpoints { + // Used for adding node region label to an endpoint but only needs to called once + node, err := t.Client.CoreV1().Nodes().Get(t.Ctx, *endpoint.NodeName, metav1.GetOptions{}) + if err != nil { + t.Log.Warn("error getting node info", "error", err) + continue + } + for _, endpointAddr := range endpoint.Addresses { var addr string @@ -788,6 +792,9 @@ func (t *ServiceResource) registerServiceInstance( if endpoint.Zone != nil { r.Service.Meta[ConsulK8STopologyZone] = *endpoint.Zone } + if region := node.Labels[corev1.LabelTopologyRegion]; region != "" { + r.Service.Meta[ConsulK8STopologyRegion] = region + } r.Check = &consulapi.AgentCheck{ CheckID: consulHealthCheckID(endpointSlice.Namespace, serviceID(r.Service.Service, addr)), diff --git a/control-plane/catalog/to-consul/resource_test.go b/control-plane/catalog/to-consul/resource_test.go index 5e0f938c7a..f38bfa52f2 100644 --- a/control-plane/catalog/to-consul/resource_test.go +++ b/control-plane/catalog/to-consul/resource_test.go @@ -1169,6 +1169,7 @@ func TestServiceResource_clusterIP(t *testing.T) { require.Equal(r, 8080, actual[1].Service.Port) require.Equal(r, "us-west-2a", actual[0].Service.Meta["external-k8s-topology-zone"]) require.Equal(r, "us-west-2b", actual[1].Service.Meta["external-k8s-topology-zone"]) + require.Equal(r, "us-west-2", actual[0].Service.Meta["external-k8s-topology-region"]) require.NotEqual(r, actual[0].Service.ID, actual[1].Service.ID) }) } @@ -1975,7 +1976,10 @@ func TestServiceResource_addIngress(t *testing.T) { // Create the ingress _, err = client.NetworkingV1().Ingresses(metav1.NamespaceDefault).Create(context.Background(), test.ingress, metav1.CreateOptions{}) require.NoError(t, err) + + createNodes(t, client) createEndpointSlice(t, client, "test-service", metav1.NamespaceDefault) + // Verify that the service name annotation is preferred retry.Run(t, func(r *retry.R) { syncer.Lock() @@ -2064,6 +2068,9 @@ func createNodes(t *testing.T, client *fake.Clientset) (*corev1.Node, *corev1.No node1 := &corev1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: nodeName1, + Labels: map[string]string{ + corev1.LabelTopologyRegion: "us-west-2", + }, }, Status: corev1.NodeStatus{ @@ -2080,6 +2087,9 @@ func createNodes(t *testing.T, client *fake.Clientset) (*corev1.Node, *corev1.No node2 := &corev1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: nodeName2, + Labels: map[string]string{ + corev1.LabelTopologyRegion: "us-west-2", + }, }, Status: corev1.NodeStatus{ From 395c4fe2d84a6d731efafbb84bfaf6ae3bc3225a Mon Sep 17 00:00:00 2001 From: jukie <10012479+Jukie@users.noreply.github.com> Date: Wed, 28 Feb 2024 13:08:06 -0700 Subject: [PATCH 08/16] Update release note --- .changelog/3693.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changelog/3693.txt b/.changelog/3693.txt index ef5ef42b64..b26e6da0a4 100644 --- a/.changelog/3693.txt +++ b/.changelog/3693.txt @@ -1,3 +1,3 @@ ```release-note:improvement -catalog: Topology Zone information is now read from the Kubernetes endpoints and added to registered consul services under Meta. +catalog: Topology zone and region information is now read from the Kubernetes endpoints and associated node and added to registered consul services under Metadata. ``` \ No newline at end of file From 73ffbcbb79969a12e94b05b24caa62b6d8cb5748 Mon Sep 17 00:00:00 2001 From: jukie <10012479+Jukie@users.noreply.github.com> Date: Wed, 6 Mar 2024 19:26:48 -0700 Subject: [PATCH 09/16] Fix tests --- .../catalog/to-consul/resource_test.go | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/control-plane/catalog/to-consul/resource_test.go b/control-plane/catalog/to-consul/resource_test.go index f38bfa52f2..a49c3b5790 100644 --- a/control-plane/catalog/to-consul/resource_test.go +++ b/control-plane/catalog/to-consul/resource_test.go @@ -1152,6 +1152,8 @@ func TestServiceResource_clusterIP(t *testing.T) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) require.NoError(t, err) + createNodes(t, client) + // Insert the endpoint slice createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) @@ -1191,6 +1193,8 @@ func TestServiceResource_clusterIP_healthCheck(t *testing.T) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) require.NoError(t, err) + createNodes(t, client) + // Insert the endpoint slice createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) @@ -1229,6 +1233,8 @@ func TestServiceResource_clusterIPPrefix(t *testing.T) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) require.NoError(t, err) + createNodes(t, client) + // Insert the endpoint slice createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) @@ -1267,6 +1273,8 @@ func TestServiceResource_clusterIPAnnotatedPortName(t *testing.T) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) require.NoError(t, err) + createNodes(t, client) + // Insert the endpoint slice createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) @@ -1305,6 +1313,8 @@ func TestServiceResource_clusterIPAnnotatedPortNumber(t *testing.T) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) require.NoError(t, err) + createNodes(t, client) + // Insert the endpoint slice createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) @@ -1345,6 +1355,8 @@ func TestServiceResource_clusterIPUnnamedPorts(t *testing.T) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) require.NoError(t, err) + createNodes(t, client) + // Insert the endpoint slice createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) @@ -1382,6 +1394,8 @@ func TestServiceResource_clusterIPSyncDisabled(t *testing.T) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) require.NoError(t, err) + createNodes(t, client) + // Insert the endpoint slice createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) @@ -1412,6 +1426,8 @@ func TestServiceResource_clusterIPAllNamespaces(t *testing.T) { _, err := client.CoreV1().Services(testNamespace).Create(context.Background(), svc, metav1.CreateOptions{}) require.NoError(t, err) + createNodes(t, client) + // Insert the endpoint slice createEndpointSlice(t, client, "foo", testNamespace) @@ -1453,6 +1469,8 @@ func TestServiceResource_clusterIPTargetPortNamed(t *testing.T) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) require.NoError(t, err) + createNodes(t, client) + // Insert the endpoint slice createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) @@ -1489,6 +1507,8 @@ func TestServiceResource_targetRefInMeta(t *testing.T) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) require.NoError(t, err) + createNodes(t, client) + // Insert the endpoint slice createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) From cc5d60c832bcc87103624f694f92d96edc0527da Mon Sep 17 00:00:00 2001 From: John Murret Date: Thu, 7 Mar 2024 10:34:28 -0700 Subject: [PATCH 10/16] fix sync-catalog-clusterrole and tests --- charts/consul/templates/sync-catalog-clusterrole.yaml | 11 ++++++++++- charts/consul/test/unit/sync-catalog-clusterrole.bats | 10 +++++++++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/charts/consul/templates/sync-catalog-clusterrole.yaml b/charts/consul/templates/sync-catalog-clusterrole.yaml index d2e0cac45e..839775663d 100644 --- a/charts/consul/templates/sync-catalog-clusterrole.yaml +++ b/charts/consul/templates/sync-catalog-clusterrole.yaml @@ -18,6 +18,15 @@ rules: - get - list - watch +<<<<<<< Updated upstream +======= +{{- if .Values.syncCatalog.toK8S }} + - update + - patch + - delete + - create +{{- end }} +>>>>>>> Stashed changes - apiGroups: ["discovery.k8s.io"] resources: - endpointslices @@ -51,4 +60,4 @@ rules: - get - list - watch -{{- end }} \ No newline at end of file +{{- end }} diff --git a/charts/consul/test/unit/sync-catalog-clusterrole.bats b/charts/consul/test/unit/sync-catalog-clusterrole.bats index 17141e434f..afc3a42b45 100755 --- a/charts/consul/test/unit/sync-catalog-clusterrole.bats +++ b/charts/consul/test/unit/sync-catalog-clusterrole.bats @@ -56,7 +56,7 @@ load _helpers --set 'syncCatalog.enabled=true' \ --set 'global.enablePodSecurityPolicies=true' \ . | tee /dev/stderr | - yq -r '.rules[2].resources[0]' | tee /dev/stderr) + yq -r '.rules[3].resources[0]' | tee /dev/stderr) [ "${actual}" = "podsecuritypolicies" ] } @@ -83,4 +83,12 @@ load _helpers . | tee /dev/stderr | yq -c '.rules[0].verbs' | tee /dev/stderr) [ "${actual}" = '["get","list","watch","update","patch","delete","create"]' ] + + actual=$(helm template \ + -s templates/sync-catalog-clusterrole.yaml \ + --set 'syncCatalog.enabled=true' \ + --set 'syncCatalog.toK8S=true' \ + . | tee /dev/stderr | + yq -c '.rules[0].verbs' | tee /dev/stderr) + [ "${actual}" = '["get","list","watch","update","patch","delete","create"]' ] } From 0d8b7ae89e4fafb8e3bc5ccc62ba87fbe7f90c6f Mon Sep 17 00:00:00 2001 From: John Murret Date: Thu, 7 Mar 2024 14:19:46 -0700 Subject: [PATCH 11/16] fix stash conflict --- charts/consul/templates/sync-catalog-clusterrole.yaml | 3 --- 1 file changed, 3 deletions(-) diff --git a/charts/consul/templates/sync-catalog-clusterrole.yaml b/charts/consul/templates/sync-catalog-clusterrole.yaml index 839775663d..89ea9f3c5c 100644 --- a/charts/consul/templates/sync-catalog-clusterrole.yaml +++ b/charts/consul/templates/sync-catalog-clusterrole.yaml @@ -18,15 +18,12 @@ rules: - get - list - watch -<<<<<<< Updated upstream -======= {{- if .Values.syncCatalog.toK8S }} - update - patch - delete - create {{- end }} ->>>>>>> Stashed changes - apiGroups: ["discovery.k8s.io"] resources: - endpointslices From 5dcb4537b3b68ae00d105bef6d316a1fb5c53560 Mon Sep 17 00:00:00 2001 From: John Murret Date: Thu, 7 Mar 2024 16:05:00 -0700 Subject: [PATCH 12/16] adding endpoints permission back to sync catalog since it still uses it. --- charts/consul/templates/sync-catalog-clusterrole.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/charts/consul/templates/sync-catalog-clusterrole.yaml b/charts/consul/templates/sync-catalog-clusterrole.yaml index 89ea9f3c5c..c59436b2d5 100644 --- a/charts/consul/templates/sync-catalog-clusterrole.yaml +++ b/charts/consul/templates/sync-catalog-clusterrole.yaml @@ -14,6 +14,7 @@ rules: - apiGroups: [ "" ] resources: - services + - endpoints verbs: - get - list From 09506ba867c06b14e21fce8799c5f0a6a00683f8 Mon Sep 17 00:00:00 2001 From: jukie <10012479+Jukie@users.noreply.github.com> Date: Sat, 16 Mar 2024 19:19:38 -0600 Subject: [PATCH 13/16] Fix endpointslice map --- control-plane/catalog/to-consul/resource.go | 96 +++++++++++++------ .../catalog/to-consul/resource_test.go | 12 ++- 2 files changed, 76 insertions(+), 32 deletions(-) diff --git a/control-plane/catalog/to-consul/resource.go b/control-plane/catalog/to-consul/resource.go index 9e7fe28cb9..b91e1ed1d1 100644 --- a/control-plane/catalog/to-consul/resource.go +++ b/control-plane/catalog/to-consul/resource.go @@ -146,9 +146,11 @@ type ServiceResource struct { // in the form /. serviceMap map[string]*corev1.Service - // endpointSlicesMap uses the same keys as serviceMap but maps to a list of EndpointSlices - // of each service. - endpointSlicesMap map[string][]discoveryv1.EndpointSlice + // endpointSlicesMap tracks EndpointSlices associated with services that are being synced to Consul. + // The outer map's keys represent service identifiers in the same format as serviceMap and maps + // each service to its related EndpointSlices. The inner map's keys are EndpointSlice name keys + // the format "/". + endpointSlicesMap map[string]map[string]discoveryv1.EndpointSlice // EnableIngress enables syncing of the hostname from an Ingress resource // to the service registration if an Ingress rule matches the service. @@ -230,8 +232,7 @@ func (t *ServiceResource) Upsert(key string, raw interface{}) error { // If we care about endpoints, we should load the associated endpoint slices. if t.shouldTrackEndpoints(key) { - - var allEndpointSlices []discoveryv1.EndpointSlice + allEndpointSlices := make(map[string]discoveryv1.EndpointSlice) labelSelector := fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, service.Name) continueToken := "" limit := int64(100) @@ -250,10 +251,14 @@ func (t *ServiceResource) Upsert(key string, raw interface{}) error { t.Log.Warn("error loading endpoint slices list", "key", key, "err", err) - return err + break + } - allEndpointSlices = append(allEndpointSlices, endpointSliceList.Items...) + for _, endpointSlice := range endpointSliceList.Items { + endptKey := service.Namespace + "/" + endpointSlice.Name + allEndpointSlices[endptKey] = endpointSlice + } if endpointSliceList.Continue != "" { continueToken = endpointSliceList.Continue @@ -263,7 +268,7 @@ func (t *ServiceResource) Upsert(key string, raw interface{}) error { } if t.endpointSlicesMap == nil { - t.endpointSlicesMap = make(map[string][]discoveryv1.EndpointSlice) + t.endpointSlicesMap = make(map[string]map[string]discoveryv1.EndpointSlice) } t.endpointSlicesMap[key] = allEndpointSlices t.Log.Debug("[ServiceResource.Upsert] adding service's endpoint slices to endpointSlicesMap", "key", key, "service", service, "endpointSlices", allEndpointSlices) @@ -627,7 +632,7 @@ func (t *ServiceResource) generateRegistrations(key string) { // Look up the node's ip address by getting node info node, err := t.Client.CoreV1().Nodes().Get(t.Ctx, *endpoint.NodeName, metav1.GetOptions{}) if err != nil { - t.Log.Warn("error getting node info", "error", err) + t.Log.Error("error getting node info", "error", err) continue } @@ -739,8 +744,7 @@ func (t *ServiceResource) registerServiceInstance( // Used for adding node region label to an endpoint but only needs to called once node, err := t.Client.CoreV1().Nodes().Get(t.Ctx, *endpoint.NodeName, metav1.GetOptions{}) if err != nil { - t.Log.Warn("error getting node info", "error", err) - continue + t.Log.Error("error getting node info", "error", err) } for _, endpointAddr := range endpoint.Addresses { @@ -789,12 +793,15 @@ func (t *ServiceResource) registerServiceInstance( if endpoint.NodeName != nil { r.Service.Meta[ConsulK8SNodeName] = *endpoint.NodeName } + if node.Labels != nil { + if region := node.Labels[corev1.LabelTopologyRegion]; region != "" { + r.Service.Meta[ConsulK8STopologyRegion] = region + } + } + if endpoint.Zone != nil { r.Service.Meta[ConsulK8STopologyZone] = *endpoint.Zone } - if region := node.Labels[corev1.LabelTopologyRegion]; region != "" { - r.Service.Meta[ConsulK8STopologyRegion] = region - } r.Check = &consulapi.AgentCheck{ CheckID: consulHealthCheckID(endpointSlice.Namespace, serviceID(r.Service.Service, addr)), @@ -873,51 +880,84 @@ func (t *serviceEndpointsResource) Informer() cache.SharedIndexInformer { ) } -func (t *serviceEndpointsResource) Upsert(key string, raw interface{}) error { +func (t *serviceEndpointsResource) Upsert(endptKey string, raw interface{}) error { svc := t.Service endpointSlice, ok := raw.(*discoveryv1.EndpointSlice) if !ok { - svc.Log.Warn("upsert got invalid type", "raw", raw) + svc.Log.Error("upsert got invalid type", "raw", raw) + return nil } svc.serviceLock.Lock() defer svc.serviceLock.Unlock() + // Extract service name and format key + svcName := endpointSlice.Labels[discoveryv1.LabelServiceName] + svcKey := endpointSlice.Namespace + "/" + endpointSlice.Labels[discoveryv1.LabelServiceName] + + if svc.serviceMap == nil { + svc.serviceMap = make(map[string]*corev1.Service) + } + var err error + if svc.serviceMap[svcKey] == nil { + svc.serviceMap[svcKey], err = t.Service.Client.CoreV1().Services(endpointSlice.Namespace).Get(t.Ctx, svcName, metav1.GetOptions{}) + if err != nil { + t.Log.Error("issue getting service", "error", err) + return err + } + } + // Check if we care about endpoints for this service - if !svc.shouldTrackEndpoints(key) { + if !svc.shouldTrackEndpoints(svcKey) { return nil } // We are tracking this service so let's keep track of the endpoints if svc.endpointSlicesMap == nil { - svc.endpointSlicesMap = make(map[string][]discoveryv1.EndpointSlice) + svc.endpointSlicesMap = make(map[string]map[string]discoveryv1.EndpointSlice) } - svc.endpointSlicesMap[key] = []discoveryv1.EndpointSlice{*endpointSlice} + if _, ok := svc.endpointSlicesMap[svcKey]; !ok { + svc.endpointSlicesMap[svcKey] = make(map[string]discoveryv1.EndpointSlice) + } + svc.endpointSlicesMap[svcKey][endptKey] = *endpointSlice // Update the registration and trigger a sync - svc.generateRegistrations(key) + svc.generateRegistrations(svcKey) svc.sync() - svc.Log.Info("upsert endpoint", "key", key) + svc.Log.Info("upsert endpoint", "key", endptKey) return nil } -func (t *serviceEndpointsResource) Delete(key string, _ interface{}) error { +func (t *serviceEndpointsResource) Delete(endptKey string, raw interface{}) error { + + endpointSlice, ok := raw.(*discoveryv1.EndpointSlice) + if !ok { + t.Service.Log.Error("upsert got invalid type", "raw", raw) + return nil + } + t.Service.serviceLock.Lock() defer t.Service.serviceLock.Unlock() + // Extract service name and format key + svcName := endpointSlice.Labels[discoveryv1.LabelServiceName] + svcKey := endpointSlice.Namespace + "/" + svcName + // This is a bit of an optimization. We only want to force a resync // if we were tracking this endpoint to begin with and that endpoint // had associated registrations. - if _, ok := t.Service.endpointSlicesMap[key]; ok { - delete(t.Service.endpointSlicesMap, key) - if _, ok := t.Service.consulMap[key]; ok { - delete(t.Service.consulMap, key) - t.Service.sync() + if _, ok := t.Service.endpointSlicesMap[svcKey]; ok { + if _, ok := t.Service.endpointSlicesMap[svcKey][endptKey]; ok { + delete(t.Service.endpointSlicesMap[svcKey], endptKey) + if _, ok := t.Service.consulMap[svcKey]; ok { + delete(t.Service.consulMap, svcKey) + t.Service.sync() + } } } - t.Service.Log.Info("delete endpoint", "key", key) + t.Service.Log.Info("delete endpoint", "key", endptKey) return nil } diff --git a/control-plane/catalog/to-consul/resource_test.go b/control-plane/catalog/to-consul/resource_test.go index a49c3b5790..04d92f80d2 100644 --- a/control-plane/catalog/to-consul/resource_test.go +++ b/control-plane/catalog/to-consul/resource_test.go @@ -18,6 +18,7 @@ import ( networkingv1 "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/rand" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" "k8s.io/utils/pointer" @@ -46,6 +47,9 @@ func TestServiceResource_createDelete(t *testing.T) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) require.NoError(t, err) + createNodes(t, client) + createEndpointSlice(t, client, svc.Name, metav1.NamespaceDefault) + // Delete require.NoError(t, client.CoreV1().Services(metav1.NamespaceDefault).Delete(context.Background(), "foo", metav1.DeleteOptions{})) @@ -977,12 +981,12 @@ func TestServiceResource_nodePortAnnotatedPort(t *testing.T) { createNodes(t, client) - createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) - // Insert the service svc := nodePortService("foo", metav1.NamespaceDefault) svc.Annotations = map[string]string{annotationServicePort: "rpc"} _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) + createEndpointSlice(t, client, svc.Name, metav1.NamespaceDefault) + require.NoError(t, err) // Verify what we got @@ -2136,8 +2140,8 @@ func createEndpointSlice(t *testing.T, client *fake.Clientset, serviceName strin context.Background(), &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ - GenerateName: serviceName + "-", - Labels: map[string]string{discoveryv1.LabelServiceName: serviceName}, + Labels: map[string]string{discoveryv1.LabelServiceName: serviceName}, + Name: serviceName + "-" + rand.String(5), }, AddressType: discoveryv1.AddressTypeIPv4, Endpoints: []discoveryv1.Endpoint{ From 5606e615d94262ac878a0b3be374b7f19565256a Mon Sep 17 00:00:00 2001 From: jukie <10012479+Jukie@users.noreply.github.com> Date: Sat, 16 Mar 2024 20:14:47 -0600 Subject: [PATCH 14/16] Fix topology region --- control-plane/catalog/to-consul/resource.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/control-plane/catalog/to-consul/resource.go b/control-plane/catalog/to-consul/resource.go index b91e1ed1d1..f8b426ad07 100644 --- a/control-plane/catalog/to-consul/resource.go +++ b/control-plane/catalog/to-consul/resource.go @@ -252,7 +252,6 @@ func (t *ServiceResource) Upsert(key string, raw interface{}) error { "key", key, "err", err) break - } for _, endpointSlice := range endpointSliceList.Items { @@ -741,10 +740,15 @@ func (t *ServiceResource) registerServiceInstance( } } for _, endpoint := range endpointSlice.Endpoints { - // Used for adding node region label to an endpoint but only needs to called once - node, err := t.Client.CoreV1().Nodes().Get(t.Ctx, *endpoint.NodeName, metav1.GetOptions{}) - if err != nil { - t.Log.Error("error getting node info", "error", err) + // Retrieve the node information for setting topology region in the synced Consul service + var node *corev1.Node + if endpoint.NodeName != nil { + var err error + // Retrieve the node information only if the NodeName is available. + node, err = t.Client.CoreV1().Nodes().Get(t.Ctx, *endpoint.NodeName, metav1.GetOptions{}) + if err != nil { + t.Log.Error("error getting node info", "error", err) + } } for _, endpointAddr := range endpoint.Addresses { @@ -793,12 +797,11 @@ func (t *ServiceResource) registerServiceInstance( if endpoint.NodeName != nil { r.Service.Meta[ConsulK8SNodeName] = *endpoint.NodeName } - if node.Labels != nil { - if region := node.Labels[corev1.LabelTopologyRegion]; region != "" { + if node != nil && node.Labels != nil { + if region, ok := node.Labels[corev1.LabelTopologyRegion]; ok && region != "" { r.Service.Meta[ConsulK8STopologyRegion] = region } } - if endpoint.Zone != nil { r.Service.Meta[ConsulK8STopologyZone] = *endpoint.Zone } From 9c41e074ca818ae4b9a7bb26bd15b1b8d89aa5ff Mon Sep 17 00:00:00 2001 From: jukie <10012479+Jukie@users.noreply.github.com> Date: Tue, 19 Mar 2024 11:46:52 -0600 Subject: [PATCH 15/16] Remove region lookups, remove endpoints permissions, use pointers for endpointslice map --- .../templates/sync-catalog-clusterrole.yaml | 1 - control-plane/catalog/to-consul/resource.go | 41 ++++++------------- 2 files changed, 12 insertions(+), 30 deletions(-) diff --git a/charts/consul/templates/sync-catalog-clusterrole.yaml b/charts/consul/templates/sync-catalog-clusterrole.yaml index c59436b2d5..89ea9f3c5c 100644 --- a/charts/consul/templates/sync-catalog-clusterrole.yaml +++ b/charts/consul/templates/sync-catalog-clusterrole.yaml @@ -14,7 +14,6 @@ rules: - apiGroups: [ "" ] resources: - services - - endpoints verbs: - get - list diff --git a/control-plane/catalog/to-consul/resource.go b/control-plane/catalog/to-consul/resource.go index f8b426ad07..2933d4375c 100644 --- a/control-plane/catalog/to-consul/resource.go +++ b/control-plane/catalog/to-consul/resource.go @@ -34,12 +34,11 @@ const ( // ConsulK8SNS is the key used in the meta to record the namespace // of the service/node registration. - ConsulK8SNS = "external-k8s-ns" - ConsulK8SRefKind = "external-k8s-ref-kind" - ConsulK8SRefValue = "external-k8s-ref-name" - ConsulK8SNodeName = "external-k8s-node-name" - ConsulK8STopologyRegion = "external-k8s-topology-region" - ConsulK8STopologyZone = "external-k8s-topology-zone" + ConsulK8SNS = "external-k8s-ns" + ConsulK8SRefKind = "external-k8s-ref-kind" + ConsulK8SRefValue = "external-k8s-ref-name" + ConsulK8SNodeName = "external-k8s-node-name" + ConsulK8STopologyZone = "external-k8s-topology-zone" // consulKubernetesCheckType is the type of health check in Consul for Kubernetes readiness status. consulKubernetesCheckType = "kubernetes-readiness" @@ -150,7 +149,7 @@ type ServiceResource struct { // The outer map's keys represent service identifiers in the same format as serviceMap and maps // each service to its related EndpointSlices. The inner map's keys are EndpointSlice name keys // the format "/". - endpointSlicesMap map[string]map[string]discoveryv1.EndpointSlice + endpointSlicesMap map[string]map[string]*discoveryv1.EndpointSlice // EnableIngress enables syncing of the hostname from an Ingress resource // to the service registration if an Ingress rule matches the service. @@ -232,7 +231,7 @@ func (t *ServiceResource) Upsert(key string, raw interface{}) error { // If we care about endpoints, we should load the associated endpoint slices. if t.shouldTrackEndpoints(key) { - allEndpointSlices := make(map[string]discoveryv1.EndpointSlice) + allEndpointSlices := make(map[string]*discoveryv1.EndpointSlice) labelSelector := fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, service.Name) continueToken := "" limit := int64(100) @@ -256,7 +255,7 @@ func (t *ServiceResource) Upsert(key string, raw interface{}) error { for _, endpointSlice := range endpointSliceList.Items { endptKey := service.Namespace + "/" + endpointSlice.Name - allEndpointSlices[endptKey] = endpointSlice + allEndpointSlices[endptKey] = &endpointSlice } if endpointSliceList.Continue != "" { @@ -267,7 +266,7 @@ func (t *ServiceResource) Upsert(key string, raw interface{}) error { } if t.endpointSlicesMap == nil { - t.endpointSlicesMap = make(map[string]map[string]discoveryv1.EndpointSlice) + t.endpointSlicesMap = make(map[string]map[string]*discoveryv1.EndpointSlice) } t.endpointSlicesMap[key] = allEndpointSlices t.Log.Debug("[ServiceResource.Upsert] adding service's endpoint slices to endpointSlicesMap", "key", key, "service", service, "endpointSlices", allEndpointSlices) @@ -740,17 +739,6 @@ func (t *ServiceResource) registerServiceInstance( } } for _, endpoint := range endpointSlice.Endpoints { - // Retrieve the node information for setting topology region in the synced Consul service - var node *corev1.Node - if endpoint.NodeName != nil { - var err error - // Retrieve the node information only if the NodeName is available. - node, err = t.Client.CoreV1().Nodes().Get(t.Ctx, *endpoint.NodeName, metav1.GetOptions{}) - if err != nil { - t.Log.Error("error getting node info", "error", err) - } - } - for _, endpointAddr := range endpoint.Addresses { var addr string @@ -797,11 +785,6 @@ func (t *ServiceResource) registerServiceInstance( if endpoint.NodeName != nil { r.Service.Meta[ConsulK8SNodeName] = *endpoint.NodeName } - if node != nil && node.Labels != nil { - if region, ok := node.Labels[corev1.LabelTopologyRegion]; ok && region != "" { - r.Service.Meta[ConsulK8STopologyRegion] = region - } - } if endpoint.Zone != nil { r.Service.Meta[ConsulK8STopologyZone] = *endpoint.Zone } @@ -918,12 +901,12 @@ func (t *serviceEndpointsResource) Upsert(endptKey string, raw interface{}) erro // We are tracking this service so let's keep track of the endpoints if svc.endpointSlicesMap == nil { - svc.endpointSlicesMap = make(map[string]map[string]discoveryv1.EndpointSlice) + svc.endpointSlicesMap = make(map[string]map[string]*discoveryv1.EndpointSlice) } if _, ok := svc.endpointSlicesMap[svcKey]; !ok { - svc.endpointSlicesMap[svcKey] = make(map[string]discoveryv1.EndpointSlice) + svc.endpointSlicesMap[svcKey] = make(map[string]*discoveryv1.EndpointSlice) } - svc.endpointSlicesMap[svcKey][endptKey] = *endpointSlice + svc.endpointSlicesMap[svcKey][endptKey] = endpointSlice // Update the registration and trigger a sync svc.generateRegistrations(svcKey) From 70d2971ea1bc6e4c0ddfee55d9714c831b8e6365 Mon Sep 17 00:00:00 2001 From: jukie <10012479+Jukie@users.noreply.github.com> Date: Tue, 19 Mar 2024 11:51:34 -0600 Subject: [PATCH 16/16] Drop region test --- control-plane/catalog/to-consul/resource_test.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/control-plane/catalog/to-consul/resource_test.go b/control-plane/catalog/to-consul/resource_test.go index 04d92f80d2..3272849bd3 100644 --- a/control-plane/catalog/to-consul/resource_test.go +++ b/control-plane/catalog/to-consul/resource_test.go @@ -1175,7 +1175,6 @@ func TestServiceResource_clusterIP(t *testing.T) { require.Equal(r, 8080, actual[1].Service.Port) require.Equal(r, "us-west-2a", actual[0].Service.Meta["external-k8s-topology-zone"]) require.Equal(r, "us-west-2b", actual[1].Service.Meta["external-k8s-topology-zone"]) - require.Equal(r, "us-west-2", actual[0].Service.Meta["external-k8s-topology-region"]) require.NotEqual(r, actual[0].Service.ID, actual[1].Service.ID) }) } @@ -2092,9 +2091,6 @@ func createNodes(t *testing.T, client *fake.Clientset) (*corev1.Node, *corev1.No node1 := &corev1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: nodeName1, - Labels: map[string]string{ - corev1.LabelTopologyRegion: "us-west-2", - }, }, Status: corev1.NodeStatus{ @@ -2111,9 +2107,6 @@ func createNodes(t *testing.T, client *fake.Clientset) (*corev1.Node, *corev1.No node2 := &corev1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: nodeName2, - Labels: map[string]string{ - corev1.LabelTopologyRegion: "us-west-2", - }, }, Status: corev1.NodeStatus{