diff --git a/.changelog/3693.txt b/.changelog/3693.txt new file mode 100644 index 0000000000..b26e6da0a4 --- /dev/null +++ b/.changelog/3693.txt @@ -0,0 +1,3 @@ +```release-note:improvement +catalog: Topology zone and region information is now read from the Kubernetes endpoints and associated node and added to registered consul services under Metadata. +``` \ No newline at end of file diff --git a/charts/consul/templates/sync-catalog-clusterrole.yaml b/charts/consul/templates/sync-catalog-clusterrole.yaml index 585b5ad171..89ea9f3c5c 100644 --- a/charts/consul/templates/sync-catalog-clusterrole.yaml +++ b/charts/consul/templates/sync-catalog-clusterrole.yaml @@ -14,7 +14,19 @@ rules: - apiGroups: [ "" ] resources: - services - - endpoints + verbs: + - get + - list + - watch +{{- if .Values.syncCatalog.toK8S }} + - update + - patch + - delete + - create +{{- end }} +- apiGroups: ["discovery.k8s.io"] + resources: + - endpointslices verbs: - get - list @@ -45,4 +57,4 @@ rules: - get - list - watch -{{- end }} \ No newline at end of file +{{- end }} diff --git a/charts/consul/test/unit/sync-catalog-clusterrole.bats b/charts/consul/test/unit/sync-catalog-clusterrole.bats index 17141e434f..afc3a42b45 100755 --- a/charts/consul/test/unit/sync-catalog-clusterrole.bats +++ b/charts/consul/test/unit/sync-catalog-clusterrole.bats @@ -56,7 +56,7 @@ load _helpers --set 'syncCatalog.enabled=true' \ --set 'global.enablePodSecurityPolicies=true' \ . | tee /dev/stderr | - yq -r '.rules[2].resources[0]' | tee /dev/stderr) + yq -r '.rules[3].resources[0]' | tee /dev/stderr) [ "${actual}" = "podsecuritypolicies" ] } @@ -83,4 +83,12 @@ load _helpers . | tee /dev/stderr | yq -c '.rules[0].verbs' | tee /dev/stderr) [ "${actual}" = '["get","list","watch","update","patch","delete","create"]' ] + + actual=$(helm template \ + -s templates/sync-catalog-clusterrole.yaml \ + --set 'syncCatalog.enabled=true' \ + --set 'syncCatalog.toK8S=true' \ + . | tee /dev/stderr | + yq -c '.rules[0].verbs' | tee /dev/stderr) + [ "${actual}" = '["get","list","watch","update","patch","delete","create"]' ] } diff --git a/control-plane/catalog/to-consul/resource.go b/control-plane/catalog/to-consul/resource.go index 2d29d6c15a..2933d4375c 100644 --- a/control-plane/catalog/to-consul/resource.go +++ b/control-plane/catalog/to-consul/resource.go @@ -17,6 +17,7 @@ import ( consulapi "github.com/hashicorp/consul/api" "github.com/hashicorp/go-hclog" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" networkingv1 "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -33,10 +34,11 @@ const ( // ConsulK8SNS is the key used in the meta to record the namespace // of the service/node registration. - ConsulK8SNS = "external-k8s-ns" - ConsulK8SRefKind = "external-k8s-ref-kind" - ConsulK8SRefValue = "external-k8s-ref-name" - ConsulK8SNodeName = "external-k8s-node-name" + ConsulK8SNS = "external-k8s-ns" + ConsulK8SRefKind = "external-k8s-ref-kind" + ConsulK8SRefValue = "external-k8s-ref-name" + ConsulK8SNodeName = "external-k8s-node-name" + ConsulK8STopologyZone = "external-k8s-topology-zone" // consulKubernetesCheckType is the type of health check in Consul for Kubernetes readiness status. consulKubernetesCheckType = "kubernetes-readiness" @@ -143,9 +145,11 @@ type ServiceResource struct { // in the form /. serviceMap map[string]*corev1.Service - // endpointsMap uses the same keys as serviceMap but maps to the endpoints - // of each service. - endpointsMap map[string]*corev1.Endpoints + // endpointSlicesMap tracks EndpointSlices associated with services that are being synced to Consul. + // The outer map's keys represent service identifiers in the same format as serviceMap and maps + // each service to its related EndpointSlices. The inner map's keys are EndpointSlice name keys + // the format "/". + endpointSlicesMap map[string]map[string]*discoveryv1.EndpointSlice // EnableIngress enables syncing of the hostname from an Ingress resource // to the service registration if an Ingress rule matches the service. @@ -225,22 +229,47 @@ func (t *ServiceResource) Upsert(key string, raw interface{}) error { t.serviceMap[key] = service t.Log.Debug("[ServiceResource.Upsert] adding service to serviceMap", "key", key, "service", service) - // If we care about endpoints, we should do the initial endpoints load. + // If we care about endpoints, we should load the associated endpoint slices. if t.shouldTrackEndpoints(key) { - endpoints, err := t.Client.CoreV1(). - Endpoints(service.Namespace). - Get(t.Ctx, service.Name, metav1.GetOptions{}) - if err != nil { - t.Log.Warn("error loading initial endpoints", - "key", key, - "err", err) - } else { - if t.endpointsMap == nil { - t.endpointsMap = make(map[string]*corev1.Endpoints) + allEndpointSlices := make(map[string]*discoveryv1.EndpointSlice) + labelSelector := fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, service.Name) + continueToken := "" + limit := int64(100) + + for { + opts := metav1.ListOptions{ + LabelSelector: labelSelector, + Limit: limit, + Continue: continueToken, } - t.endpointsMap[key] = endpoints - t.Log.Debug("[ServiceResource.Upsert] adding service's endpoints to endpointsMap", "key", key, "service", service, "endpoints", endpoints) + endpointSliceList, err := t.Client.DiscoveryV1(). + EndpointSlices(service.Namespace). + List(t.Ctx, opts) + + if err != nil { + t.Log.Warn("error loading endpoint slices list", + "key", key, + "err", err) + break + } + + for _, endpointSlice := range endpointSliceList.Items { + endptKey := service.Namespace + "/" + endpointSlice.Name + allEndpointSlices[endptKey] = &endpointSlice + } + + if endpointSliceList.Continue != "" { + continueToken = endpointSliceList.Continue + } else { + break + } + } + + if t.endpointSlicesMap == nil { + t.endpointSlicesMap = make(map[string]map[string]*discoveryv1.EndpointSlice) } + t.endpointSlicesMap[key] = allEndpointSlices + t.Log.Debug("[ServiceResource.Upsert] adding service's endpoint slices to endpointSlicesMap", "key", key, "service", service, "endpointSlices", allEndpointSlices) } // Update the registration and trigger a sync @@ -265,8 +294,8 @@ func (t *ServiceResource) Delete(key string, _ interface{}) error { func (t *ServiceResource) doDelete(key string) { delete(t.serviceMap, key) t.Log.Debug("[doDelete] deleting service from serviceMap", "key", key) - delete(t.endpointsMap, key) - t.Log.Debug("[doDelete] deleting endpoints from endpointsMap", "key", key) + delete(t.endpointSlicesMap, key) + t.Log.Debug("[doDelete] deleting endpoints from endpointSlicesMap", "key", key) // If there were registrations related to this service, then // delete them and sync. if _, ok := t.consulMap[key]; ok { @@ -582,27 +611,26 @@ func (t *ServiceResource) generateRegistrations(key string) { // pods are running on. This way we don't register _every_ K8S // node as part of the service. case corev1.ServiceTypeNodePort: - if t.endpointsMap == nil { + if t.endpointSlicesMap == nil { return } - endpoints := t.endpointsMap[key] - if endpoints == nil { + endpointSliceList := t.endpointSlicesMap[key] + if endpointSliceList == nil { return } - for _, subset := range endpoints.Subsets { - for _, subsetAddr := range subset.Addresses { + for _, endpointSlice := range endpointSliceList { + for _, endpoint := range endpointSlice.Endpoints { // Check that the node name exists // subsetAddr.NodeName is of type *string - if subsetAddr.NodeName == nil { + if endpoint.NodeName == nil { continue } - // Look up the node's ip address by getting node info - node, err := t.Client.CoreV1().Nodes().Get(t.Ctx, *subsetAddr.NodeName, metav1.GetOptions{}) + node, err := t.Client.CoreV1().Nodes().Get(t.Ctx, *endpoint.NodeName, metav1.GetOptions{}) if err != nil { - t.Log.Warn("error getting node info", "error", err) + t.Log.Error("error getting node info", "error", err) continue } @@ -614,37 +642,18 @@ func (t *ServiceResource) generateRegistrations(key string) { expectedType = corev1.NodeExternalIP } - // Find the ip address for the node and - // create the Consul service using it - var found bool - for _, address := range node.Status.Addresses { - if address.Type == expectedType { - found = true - r := baseNode - rs := baseService - r.Service = &rs - r.Service.ID = serviceID(r.Service.Service, subsetAddr.IP) - r.Service.Address = address.Address - - t.consulMap[key] = append(t.consulMap[key], &r) - // Only consider the first address that matches. In some cases - // there will be multiple addresses like when using AWS CNI. - // In those cases, Kubernetes will ensure eth0 is always the first - // address in the list. - // See https://github.com/kubernetes/kubernetes/blob/b559434c02f903dbcd46ee7d6c78b216d3f0aca0/staging/src/k8s.io/legacy-cloud-providers/aws/aws.go#L1462-L1464 - break - } - } + for _, endpointAddr := range endpoint.Addresses { - // If an ExternalIP wasn't found, and ExternalFirst is set, - // use an InternalIP - if t.NodePortSync == ExternalFirst && !found { + // Find the ip address for the node and + // create the Consul service using it + var found bool for _, address := range node.Status.Addresses { - if address.Type == corev1.NodeInternalIP { + if address.Type == expectedType { + found = true r := baseNode rs := baseService r.Service = &rs - r.Service.ID = serviceID(r.Service.Service, subsetAddr.IP) + r.Service.ID = serviceID(r.Service.Service, endpointAddr) r.Service.Address = address.Address t.consulMap[key] = append(t.consulMap[key], &r) @@ -656,6 +665,29 @@ func (t *ServiceResource) generateRegistrations(key string) { break } } + + // If an ExternalIP wasn't found, and ExternalFirst is set, + // use an InternalIP + if t.NodePortSync == ExternalFirst && !found { + for _, address := range node.Status.Addresses { + if address.Type == corev1.NodeInternalIP { + r := baseNode + rs := baseService + r.Service = &rs + r.Service.ID = serviceID(r.Service.Service, endpointAddr) + r.Service.Address = address.Address + + t.consulMap[key] = append(t.consulMap[key], &r) + // Only consider the first address that matches. In some cases + // there will be multiple addresses like when using AWS CNI. + // In those cases, Kubernetes will ensure eth0 is always the first + // address in the list. + // See https://github.com/kubernetes/kubernetes/blob/b559434c02f903dbcd46ee7d6c78b216d3f0aca0/staging/src/k8s.io/legacy-cloud-providers/aws/aws.go#L1462-L1464 + break + } + } + } + } } } @@ -675,94 +707,100 @@ func (t *ServiceResource) registerServiceInstance( overridePortNumber int, useHostname bool) { - if t.endpointsMap == nil { + if t.endpointSlicesMap == nil { return } - endpoints := t.endpointsMap[key] - if endpoints == nil { + endpointSliceList := t.endpointSlicesMap[key] + if endpointSliceList == nil { return } seen := map[string]struct{}{} - for _, subset := range endpoints.Subsets { + for _, endpointSlice := range endpointSliceList { // For ClusterIP services and if LoadBalancerEndpointsSync is true, we use the endpoint port instead // of the service port because we're registering each endpoint // as a separate service instance. epPort := baseService.Port if overridePortName != "" { // If we're supposed to use a specific named port, find it. - for _, p := range subset.Ports { - if overridePortName == p.Name { - epPort = int(p.Port) + for _, p := range endpointSlice.Ports { + if overridePortName == *p.Name { + epPort = int(*p.Port) break } } } else if overridePortNumber == 0 { // Otherwise we'll just use the first port in the list // (unless the port number was overridden by an annotation). - for _, p := range subset.Ports { - epPort = int(p.Port) + for _, p := range endpointSlice.Ports { + epPort = int(*p.Port) break } } - for _, subsetAddr := range subset.Addresses { - var addr string - // Use the address and port from the Ingress resource if - // ingress-sync is enabled and the service has an ingress - // resource that references it. - if t.EnableIngress && t.isIngressService(key) { - addr = t.serviceHostnameMap[key].hostName - epPort = int(t.serviceHostnameMap[key].port) - } else { - addr = subsetAddr.IP - if addr == "" && useHostname { - addr = subsetAddr.Hostname + for _, endpoint := range endpointSlice.Endpoints { + for _, endpointAddr := range endpoint.Addresses { + + var addr string + // Use the address and port from the Ingress resource if + // ingress-sync is enabled and the service has an ingress + // resource that references it. + if t.EnableIngress && t.isIngressService(key) { + addr = t.serviceHostnameMap[key].hostName + epPort = int(t.serviceHostnameMap[key].port) + } else { + addr = endpointAddr + if addr == "" && useHostname { + addr = *endpoint.Hostname + } + if addr == "" { + continue + } } - if addr == "" { + + // Its not clear whether K8S guarantees ready addresses to + // be unique so we maintain a set to prevent duplicates just + // in case. + if _, ok := seen[addr]; ok { continue } - } + seen[addr] = struct{}{} - // Its not clear whether K8S guarantees ready addresses to - // be unique so we maintain a set to prevent duplicates just - // in case. - if _, ok := seen[addr]; ok { - continue - } - seen[addr] = struct{}{} + r := baseNode + rs := baseService + r.Service = &rs + r.Service.ID = serviceID(r.Service.Service, addr) + r.Service.Address = addr + r.Service.Port = epPort + r.Service.Meta = make(map[string]string) + // Deepcopy baseService.Meta into r.Service.Meta as baseService is shared + // between all nodes of a service + for k, v := range baseService.Meta { + r.Service.Meta[k] = v + } + if endpoint.TargetRef != nil { + r.Service.Meta[ConsulK8SRefValue] = endpoint.TargetRef.Name + r.Service.Meta[ConsulK8SRefKind] = endpoint.TargetRef.Kind + } + if endpoint.NodeName != nil { + r.Service.Meta[ConsulK8SNodeName] = *endpoint.NodeName + } + if endpoint.Zone != nil { + r.Service.Meta[ConsulK8STopologyZone] = *endpoint.Zone + } - r := baseNode - rs := baseService - r.Service = &rs - r.Service.ID = serviceID(r.Service.Service, addr) - r.Service.Address = addr - r.Service.Port = epPort - r.Service.Meta = make(map[string]string) - // Deepcopy baseService.Meta into r.Service.Meta as baseService is shared - // between all nodes of a service - for k, v := range baseService.Meta { - r.Service.Meta[k] = v - } - if subsetAddr.TargetRef != nil { - r.Service.Meta[ConsulK8SRefValue] = subsetAddr.TargetRef.Name - r.Service.Meta[ConsulK8SRefKind] = subsetAddr.TargetRef.Kind - } - if subsetAddr.NodeName != nil { - r.Service.Meta[ConsulK8SNodeName] = *subsetAddr.NodeName - } + r.Check = &consulapi.AgentCheck{ + CheckID: consulHealthCheckID(endpointSlice.Namespace, serviceID(r.Service.Service, addr)), + Name: consulKubernetesCheckName, + Namespace: baseService.Namespace, + Type: consulKubernetesCheckType, + Status: consulapi.HealthPassing, + ServiceID: serviceID(r.Service.Service, addr), + Output: kubernetesSuccessReasonMsg, + } - r.Check = &consulapi.AgentCheck{ - CheckID: consulHealthCheckID(endpoints.Namespace, serviceID(r.Service.Service, addr)), - Name: consulKubernetesCheckName, - Namespace: baseService.Namespace, - Type: consulKubernetesCheckType, - Status: consulapi.HealthPassing, - ServiceID: serviceID(r.Service.Service, addr), - Output: kubernetesSuccessReasonMsg, + t.consulMap[key] = append(t.consulMap[key], &r) } - - t.consulMap[key] = append(t.consulMap[key], &r) } } } @@ -811,68 +849,101 @@ func (t *serviceEndpointsResource) Informer() cache.SharedIndexInformer { return cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return t.Service.Client.CoreV1(). - Endpoints(metav1.NamespaceAll). + return t.Service.Client.DiscoveryV1(). + EndpointSlices(metav1.NamespaceAll). List(t.Ctx, options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return t.Service.Client.CoreV1(). - Endpoints(metav1.NamespaceAll). + return t.Service.Client.DiscoveryV1(). + EndpointSlices(metav1.NamespaceAll). Watch(t.Ctx, options) }, }, - &corev1.Endpoints{}, + &discoveryv1.EndpointSlice{}, 0, cache.Indexers{}, ) } -func (t *serviceEndpointsResource) Upsert(key string, raw interface{}) error { +func (t *serviceEndpointsResource) Upsert(endptKey string, raw interface{}) error { svc := t.Service - endpoints, ok := raw.(*corev1.Endpoints) + + endpointSlice, ok := raw.(*discoveryv1.EndpointSlice) if !ok { - svc.Log.Warn("upsert got invalid type", "raw", raw) + svc.Log.Error("upsert got invalid type", "raw", raw) return nil } svc.serviceLock.Lock() defer svc.serviceLock.Unlock() + // Extract service name and format key + svcName := endpointSlice.Labels[discoveryv1.LabelServiceName] + svcKey := endpointSlice.Namespace + "/" + endpointSlice.Labels[discoveryv1.LabelServiceName] + + if svc.serviceMap == nil { + svc.serviceMap = make(map[string]*corev1.Service) + } + var err error + if svc.serviceMap[svcKey] == nil { + svc.serviceMap[svcKey], err = t.Service.Client.CoreV1().Services(endpointSlice.Namespace).Get(t.Ctx, svcName, metav1.GetOptions{}) + if err != nil { + t.Log.Error("issue getting service", "error", err) + return err + } + } + // Check if we care about endpoints for this service - if !svc.shouldTrackEndpoints(key) { + if !svc.shouldTrackEndpoints(svcKey) { return nil } // We are tracking this service so let's keep track of the endpoints - if svc.endpointsMap == nil { - svc.endpointsMap = make(map[string]*corev1.Endpoints) + if svc.endpointSlicesMap == nil { + svc.endpointSlicesMap = make(map[string]map[string]*discoveryv1.EndpointSlice) } - svc.endpointsMap[key] = endpoints + if _, ok := svc.endpointSlicesMap[svcKey]; !ok { + svc.endpointSlicesMap[svcKey] = make(map[string]*discoveryv1.EndpointSlice) + } + svc.endpointSlicesMap[svcKey][endptKey] = endpointSlice // Update the registration and trigger a sync - svc.generateRegistrations(key) + svc.generateRegistrations(svcKey) svc.sync() - svc.Log.Info("upsert endpoint", "key", key) + svc.Log.Info("upsert endpoint", "key", endptKey) return nil } -func (t *serviceEndpointsResource) Delete(key string, _ interface{}) error { +func (t *serviceEndpointsResource) Delete(endptKey string, raw interface{}) error { + + endpointSlice, ok := raw.(*discoveryv1.EndpointSlice) + if !ok { + t.Service.Log.Error("upsert got invalid type", "raw", raw) + return nil + } + t.Service.serviceLock.Lock() defer t.Service.serviceLock.Unlock() + // Extract service name and format key + svcName := endpointSlice.Labels[discoveryv1.LabelServiceName] + svcKey := endpointSlice.Namespace + "/" + svcName + // This is a bit of an optimization. We only want to force a resync // if we were tracking this endpoint to begin with and that endpoint // had associated registrations. - if _, ok := t.Service.endpointsMap[key]; ok { - delete(t.Service.endpointsMap, key) - if _, ok := t.Service.consulMap[key]; ok { - delete(t.Service.consulMap, key) - t.Service.sync() + if _, ok := t.Service.endpointSlicesMap[svcKey]; ok { + if _, ok := t.Service.endpointSlicesMap[svcKey][endptKey]; ok { + delete(t.Service.endpointSlicesMap[svcKey], endptKey) + if _, ok := t.Service.consulMap[svcKey]; ok { + delete(t.Service.consulMap, svcKey) + t.Service.sync() + } } } - t.Service.Log.Info("delete endpoint", "key", key) + t.Service.Log.Info("delete endpoint", "key", endptKey) return nil } diff --git a/control-plane/catalog/to-consul/resource_test.go b/control-plane/catalog/to-consul/resource_test.go index 3b8fb78497..3272849bd3 100644 --- a/control-plane/catalog/to-consul/resource_test.go +++ b/control-plane/catalog/to-consul/resource_test.go @@ -14,11 +14,14 @@ import ( "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" networkingv1 "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/rand" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" + "k8s.io/utils/pointer" ) const nodeName1 = "ip-10-11-12-13.ec2.internal" @@ -44,6 +47,9 @@ func TestServiceResource_createDelete(t *testing.T) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) require.NoError(t, err) + createNodes(t, client) + createEndpointSlice(t, client, svc.Name, metav1.NamespaceDefault) + // Delete require.NoError(t, client.CoreV1().Services(metav1.NamespaceDefault).Delete(context.Background(), "foo", metav1.DeleteOptions{})) @@ -759,23 +765,36 @@ func TestServiceResource_lbRegisterEndpoints(t *testing.T) { node1, _ := createNodes(t, client) - // Insert the endpoints - _, err := client.CoreV1().Endpoints(metav1.NamespaceDefault).Create( + // Insert the endpoint slice + _, err := client.DiscoveryV1().EndpointSlices(metav1.NamespaceDefault).Create( context.Background(), - &corev1.Endpoints{ + &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ - Name: "foo", + GenerateName: "foo-", + Labels: map[string]string{discoveryv1.LabelServiceName: "foo"}, }, - - Subsets: []corev1.EndpointSubset{ + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{ { - Addresses: []corev1.EndpointAddress{ - {NodeName: &node1.Name, IP: "8.8.8.8"}, - }, - Ports: []corev1.EndpointPort{ - {Name: "http", Port: 8080}, - {Name: "rpc", Port: 2000}, + Addresses: []string{"8.8.8.8"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: pointer.Bool(true), + Serving: pointer.Bool(true), + Terminating: pointer.Bool(false), }, + TargetRef: &corev1.ObjectReference{Kind: "pod", Name: "foo", Namespace: metav1.NamespaceDefault}, + NodeName: &node1.Name, + Zone: pointer.String("us-west-2a"), + }, + }, + Ports: []discoveryv1.EndpointPort{ + { + Name: pointer.String("http"), + Port: pointer.Int32(8080), + }, + { + Name: pointer.String("rpc"), + Port: pointer.Int32(2000), }, }, }, @@ -814,7 +833,7 @@ func TestServiceResource_nodePort(t *testing.T) { createNodes(t, client) - createEndpoints(t, client, "foo", metav1.NamespaceDefault) + createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Insert the service svc := nodePortService("foo", metav1.NamespaceDefault) @@ -854,7 +873,7 @@ func TestServiceResource_nodePortPrefix(t *testing.T) { createNodes(t, client) - createEndpoints(t, client, "foo", metav1.NamespaceDefault) + createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Insert the service svc := nodePortService("foo", metav1.NamespaceDefault) @@ -894,23 +913,36 @@ func TestServiceResource_nodePort_singleEndpoint(t *testing.T) { node1, _ := createNodes(t, client) - // Insert the endpoints - _, err := client.CoreV1().Endpoints(metav1.NamespaceDefault).Create( + // Insert the endpoint slice + _, err := client.DiscoveryV1().EndpointSlices(metav1.NamespaceDefault).Create( context.Background(), - &corev1.Endpoints{ + &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ - Name: "foo", + GenerateName: "foo-", + Labels: map[string]string{discoveryv1.LabelServiceName: "foo"}, }, - - Subsets: []corev1.EndpointSubset{ + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{ { - Addresses: []corev1.EndpointAddress{ - {NodeName: &node1.Name, IP: "1.2.3.4"}, - }, - Ports: []corev1.EndpointPort{ - {Name: "http", Port: 8080}, - {Name: "rpc", Port: 2000}, + Addresses: []string{"1.2.3.4"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: pointer.Bool(true), + Serving: pointer.Bool(true), + Terminating: pointer.Bool(false), }, + TargetRef: &corev1.ObjectReference{Kind: "pod", Name: "foo", Namespace: metav1.NamespaceDefault}, + NodeName: &node1.Name, + Zone: pointer.String("us-west-2a"), + }, + }, + Ports: []discoveryv1.EndpointPort{ + { + Name: pointer.String("http"), + Port: pointer.Int32(8080), + }, + { + Name: pointer.String("rpc"), + Port: pointer.Int32(2000), }, }, }, @@ -949,12 +981,12 @@ func TestServiceResource_nodePortAnnotatedPort(t *testing.T) { createNodes(t, client) - createEndpoints(t, client, "foo", metav1.NamespaceDefault) - // Insert the service svc := nodePortService("foo", metav1.NamespaceDefault) svc.Annotations = map[string]string{annotationServicePort: "rpc"} _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) + createEndpointSlice(t, client, svc.Name, metav1.NamespaceDefault) + require.NoError(t, err) // Verify what we got @@ -989,7 +1021,7 @@ func TestServiceResource_nodePortUnnamedPort(t *testing.T) { createNodes(t, client) - createEndpoints(t, client, "foo", metav1.NamespaceDefault) + createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Insert the service svc := nodePortService("foo", metav1.NamespaceDefault) @@ -1034,7 +1066,7 @@ func TestServiceResource_nodePort_internalOnlySync(t *testing.T) { createNodes(t, client) - createEndpoints(t, client, "foo", metav1.NamespaceDefault) + createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Insert the service svc := nodePortService("foo", metav1.NamespaceDefault) @@ -1082,7 +1114,7 @@ func TestServiceResource_nodePort_externalFirstSync(t *testing.T) { _, err := client.CoreV1().Nodes().UpdateStatus(context.Background(), node1, metav1.UpdateOptions{}) require.NoError(t, err) - createEndpoints(t, client, "foo", metav1.NamespaceDefault) + createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Insert the service svc := nodePortService("foo", metav1.NamespaceDefault) @@ -1124,8 +1156,10 @@ func TestServiceResource_clusterIP(t *testing.T) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) require.NoError(t, err) - // Insert the endpoints - createEndpoints(t, client, "foo", metav1.NamespaceDefault) + createNodes(t, client) + + // Insert the endpoint slice + createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Verify what we got retry.Run(t, func(r *retry.R) { @@ -1139,6 +1173,8 @@ func TestServiceResource_clusterIP(t *testing.T) { require.Equal(r, "foo", actual[1].Service.Service) require.Equal(r, "2.2.2.2", actual[1].Service.Address) require.Equal(r, 8080, actual[1].Service.Port) + require.Equal(r, "us-west-2a", actual[0].Service.Meta["external-k8s-topology-zone"]) + require.Equal(r, "us-west-2b", actual[1].Service.Meta["external-k8s-topology-zone"]) require.NotEqual(r, actual[0].Service.ID, actual[1].Service.ID) }) } @@ -1160,8 +1196,10 @@ func TestServiceResource_clusterIP_healthCheck(t *testing.T) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) require.NoError(t, err) - // Insert the endpoints - createEndpoints(t, client, "foo", metav1.NamespaceDefault) + createNodes(t, client) + + // Insert the endpoint slice + createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Verify what we got retry.Run(t, func(r *retry.R) { @@ -1198,8 +1236,10 @@ func TestServiceResource_clusterIPPrefix(t *testing.T) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) require.NoError(t, err) - // Insert the endpoints - createEndpoints(t, client, "foo", metav1.NamespaceDefault) + createNodes(t, client) + + // Insert the endpoint slice + createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Verify what we got retry.Run(t, func(r *retry.R) { @@ -1236,8 +1276,10 @@ func TestServiceResource_clusterIPAnnotatedPortName(t *testing.T) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) require.NoError(t, err) - // Insert the endpoints - createEndpoints(t, client, "foo", metav1.NamespaceDefault) + createNodes(t, client) + + // Insert the endpoint slice + createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Verify what we got retry.Run(t, func(r *retry.R) { @@ -1274,8 +1316,10 @@ func TestServiceResource_clusterIPAnnotatedPortNumber(t *testing.T) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) require.NoError(t, err) - // Insert the endpoints - createEndpoints(t, client, "foo", metav1.NamespaceDefault) + createNodes(t, client) + + // Insert the endpoint slice + createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Verify what we got retry.Run(t, func(r *retry.R) { @@ -1314,8 +1358,10 @@ func TestServiceResource_clusterIPUnnamedPorts(t *testing.T) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) require.NoError(t, err) - // Insert the endpoints - createEndpoints(t, client, "foo", metav1.NamespaceDefault) + createNodes(t, client) + + // Insert the endpoint slice + createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Verify what we got retry.Run(t, func(r *retry.R) { @@ -1351,8 +1397,10 @@ func TestServiceResource_clusterIPSyncDisabled(t *testing.T) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) require.NoError(t, err) - // Insert the endpoints - createEndpoints(t, client, "foo", metav1.NamespaceDefault) + createNodes(t, client) + + // Insert the endpoint slice + createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Verify what we got retry.Run(t, func(r *retry.R) { @@ -1381,8 +1429,10 @@ func TestServiceResource_clusterIPAllNamespaces(t *testing.T) { _, err := client.CoreV1().Services(testNamespace).Create(context.Background(), svc, metav1.CreateOptions{}) require.NoError(t, err) - // Insert the endpoints - createEndpoints(t, client, "foo", testNamespace) + createNodes(t, client) + + // Insert the endpoint slice + createEndpointSlice(t, client, "foo", testNamespace) // Verify what we got retry.Run(t, func(r *retry.R) { @@ -1422,8 +1472,10 @@ func TestServiceResource_clusterIPTargetPortNamed(t *testing.T) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) require.NoError(t, err) - // Insert the endpoints - createEndpoints(t, client, "foo", metav1.NamespaceDefault) + createNodes(t, client) + + // Insert the endpoint slice + createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Verify what we got retry.Run(t, func(r *retry.R) { @@ -1458,8 +1510,10 @@ func TestServiceResource_targetRefInMeta(t *testing.T) { _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) require.NoError(t, err) - // Insert the endpoints - createEndpoints(t, client, "foo", metav1.NamespaceDefault) + createNodes(t, client) + + // Insert the endpoint slice + createEndpointSlice(t, client, "foo", metav1.NamespaceDefault) // Verify what we got retry.Run(t, func(r *retry.R) { @@ -1945,7 +1999,10 @@ func TestServiceResource_addIngress(t *testing.T) { // Create the ingress _, err = client.NetworkingV1().Ingresses(metav1.NamespaceDefault).Create(context.Background(), test.ingress, metav1.CreateOptions{}) require.NoError(t, err) - createEndpoints(t, client, "test-service", metav1.NamespaceDefault) + + createNodes(t, client) + createEndpointSlice(t, client, "test-service", metav1.NamespaceDefault) + // Verify that the service name annotation is preferred retry.Run(t, func(r *retry.R) { syncer.Lock() @@ -2066,43 +2123,55 @@ func createNodes(t *testing.T, client *fake.Clientset) (*corev1.Node, *corev1.No return node1, node2 } -// createEndpoints calls the fake k8s client to create two endpoints on two nodes. -func createEndpoints(t *testing.T, client *fake.Clientset, serviceName string, namespace string) { +// createEndpointSlices calls the fake k8s client to create an endpoint slices with two endpoints on different nodes. +func createEndpointSlice(t *testing.T, client *fake.Clientset, serviceName string, namespace string) { node1 := nodeName1 node2 := nodeName2 targetRef := corev1.ObjectReference{Kind: "pod", Name: "foobar"} - _, err := client.CoreV1().Endpoints(namespace).Create( + + _, err := client.DiscoveryV1().EndpointSlices(namespace).Create( context.Background(), - &corev1.Endpoints{ + &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ - Name: serviceName, - Namespace: namespace, + Labels: map[string]string{discoveryv1.LabelServiceName: serviceName}, + Name: serviceName + "-" + rand.String(5), }, - - Subsets: []corev1.EndpointSubset{ + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{ { - Addresses: []corev1.EndpointAddress{ - {NodeName: &node1, IP: "1.1.1.1", TargetRef: &targetRef}, - }, - Ports: []corev1.EndpointPort{ - {Name: "http", Port: 8080}, - {Name: "rpc", Port: 2000}, + Addresses: []string{"1.1.1.1"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: pointer.Bool(true), + Serving: pointer.Bool(true), + Terminating: pointer.Bool(false), }, + TargetRef: &targetRef, + NodeName: &node1, + Zone: pointer.String("us-west-2a"), }, - { - Addresses: []corev1.EndpointAddress{ - {NodeName: &node2, IP: "2.2.2.2"}, - }, - Ports: []corev1.EndpointPort{ - {Name: "http", Port: 8080}, - {Name: "rpc", Port: 2000}, + Addresses: []string{"2.2.2.2"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: pointer.Bool(true), + Serving: pointer.Bool(true), + Terminating: pointer.Bool(false), }, + NodeName: &node2, + Zone: pointer.String("us-west-2b"), + }, + }, + Ports: []discoveryv1.EndpointPort{ + { + Name: pointer.String("http"), + Port: pointer.Int32(8080), + }, + { + Name: pointer.String("rpc"), + Port: pointer.Int32(2000), }, }, }, metav1.CreateOptions{}) - require.NoError(t, err) }