Skip to content

Commit

Permalink
Cleanup and changelog entry
Browse files Browse the repository at this point in the history
  • Loading branch information
jukie authored and jmurret committed Mar 7, 2024
1 parent 4a3c209 commit c11b034
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 51 deletions.
3 changes: 3 additions & 0 deletions .changelog/3693.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
catalog: Topology Zone information is now read from the Kubernetes endpoints and added to registered consul services under Meta.
```
82 changes: 50 additions & 32 deletions control-plane/catalog/to-consul/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,9 @@ type ServiceResource struct {
// in the form <kube namespace>/<kube svc name>.
serviceMap map[string]*corev1.Service

// endpointSliceListMap uses the same keys as serviceMap but maps to the EndpointSliceList
// endpointSlicesMap uses the same keys as serviceMap but maps to a list of EndpointSlices
// of each service.
endpointSliceListMap map[string][]discoveryv1.EndpointSlice
endpointSlicesMap map[string][]discoveryv1.EndpointSlice

// EnableIngress enables syncing of the hostname from an Ingress resource
// to the service registration if an Ingress rule matches the service.
Expand Down Expand Up @@ -201,7 +201,7 @@ func (t *ServiceResource) Upsert(key string, raw interface{}) error {
// We expect a Service. If it isn't a service then just ignore it.
service, ok := raw.(*corev1.Service)
if !ok {
t.Log.Warn("upsert got invalid type svc", "raw", raw)
t.Log.Warn("upsert got invalid type", "raw", raw)
return nil
}

Expand All @@ -227,26 +227,45 @@ func (t *ServiceResource) Upsert(key string, raw interface{}) error {
t.serviceMap[key] = service
t.Log.Debug("[ServiceResource.Upsert] adding service to serviceMap", "key", key, "service", service)

// If we care about endpoints, we should do the initial endpoints load.
// If we care about endpoints, we should load the associated endpoint slices.
if t.shouldTrackEndpoints(key) {

// account for continue
endpointSliceList, err := t.Client.DiscoveryV1().
EndpointSlices(service.Namespace).
List(t.Ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, service.Name)})
var allEndpointSlices []discoveryv1.EndpointSlice
labelSelector := fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, service.Name)
continueToken := ""
limit := int64(100)

if err != nil {
t.Log.Warn("error loading initial endpoint slices list",
"key", key,
"err", err)
return err // Ensure to handle or return the error appropriately
} else {
if t.endpointSliceListMap == nil {
t.endpointSliceListMap = make(map[string][]discoveryv1.EndpointSlice)
for {
opts := metav1.ListOptions{
LabelSelector: labelSelector,
Limit: limit,
Continue: continueToken,
}
endpointSliceList, err := t.Client.DiscoveryV1().
EndpointSlices(service.Namespace).
List(t.Ctx, opts)

if err != nil {
t.Log.Warn("error loading endpoint slices list",
"key", key,
"err", err)
return err
}
t.endpointSliceListMap[key] = endpointSliceList.Items
t.Log.Debug("[ServiceResource.Upsert] adding service's endpoint slices to endpointSliceListMap", "key", key, "service", service, "endpointSliceList", endpointSliceList)

allEndpointSlices = append(allEndpointSlices, endpointSliceList.Items...)

if endpointSliceList.Continue != "" {
continueToken = endpointSliceList.Continue
} else {
break
}
}

if t.endpointSlicesMap == nil {
t.endpointSlicesMap = make(map[string][]discoveryv1.EndpointSlice)
}
t.endpointSlicesMap[key] = allEndpointSlices
t.Log.Debug("[ServiceResource.Upsert] adding service's endpoint slices to endpointSlicesMap", "key", key, "service", service, "endpointSlices", allEndpointSlices)
}

// Update the registration and trigger a sync
Expand All @@ -271,8 +290,8 @@ func (t *ServiceResource) Delete(key string, _ interface{}) error {
func (t *ServiceResource) doDelete(key string) {
delete(t.serviceMap, key)
t.Log.Debug("[doDelete] deleting service from serviceMap", "key", key)
delete(t.endpointSliceListMap, key)
t.Log.Debug("[doDelete] deleting endpoints from endpointSliceListMap", "key", key)
delete(t.endpointSlicesMap, key)
t.Log.Debug("[doDelete] deleting endpoints from endpointSlicesMap", "key", key)
// If there were registrations related to this service, then
// delete them and sync.
if _, ok := t.consulMap[key]; ok {
Expand Down Expand Up @@ -588,11 +607,11 @@ func (t *ServiceResource) generateRegistrations(key string) {
// pods are running on. This way we don't register _every_ K8S
// node as part of the service.
case corev1.ServiceTypeNodePort:
if t.endpointSliceListMap == nil {
if t.endpointSlicesMap == nil {
return
}

endpointSliceList := t.endpointSliceListMap[key]
endpointSliceList := t.endpointSlicesMap[key]
if endpointSliceList == nil {
return
}
Expand Down Expand Up @@ -684,11 +703,11 @@ func (t *ServiceResource) registerServiceInstance(
overridePortNumber int,
useHostname bool) {

if t.endpointSliceListMap == nil {
if t.endpointSlicesMap == nil {
return
}

endpointSliceList := t.endpointSliceListMap[key]
endpointSliceList := t.endpointSlicesMap[key]
if endpointSliceList == nil {
return
}
Expand Down Expand Up @@ -849,7 +868,7 @@ func (t *serviceEndpointsResource) Upsert(key string, raw interface{}) error {

endpointSlice, ok := raw.(*discoveryv1.EndpointSlice)
if !ok {
svc.Log.Warn("upsert got invalid type for ep", "raw", raw)
svc.Log.Warn("upsert got invalid type", "raw", raw)
}

svc.serviceLock.Lock()
Expand All @@ -861,11 +880,10 @@ func (t *serviceEndpointsResource) Upsert(key string, raw interface{}) error {
}

// We are tracking this service so let's keep track of the endpoints
if svc.endpointSliceListMap == nil {
svc.endpointSliceListMap = make(map[string][]discoveryv1.EndpointSlice)
if svc.endpointSlicesMap == nil {
svc.endpointSlicesMap = make(map[string][]discoveryv1.EndpointSlice)
}
//svc.endpointSliceListMap[key] = &discoveryv1.EndpointSliceList{Items: []discoveryv1.EndpointSlice{*endpointSliceList}}
svc.endpointSliceListMap[key] = []discoveryv1.EndpointSlice{*endpointSlice}
svc.endpointSlicesMap[key] = []discoveryv1.EndpointSlice{*endpointSlice}

// Update the registration and trigger a sync
svc.generateRegistrations(key)
Expand All @@ -881,8 +899,8 @@ func (t *serviceEndpointsResource) Delete(key string, _ interface{}) error {
// This is a bit of an optimization. We only want to force a resync
// if we were tracking this endpoint to begin with and that endpoint
// had associated registrations.
if _, ok := t.Service.endpointSliceListMap[key]; ok {
delete(t.Service.endpointSliceListMap, key)
if _, ok := t.Service.endpointSlicesMap[key]; ok {
delete(t.Service.endpointSlicesMap, key)
if _, ok := t.Service.consulMap[key]; ok {
delete(t.Service.consulMap, key)
t.Service.sync()
Expand Down Expand Up @@ -932,7 +950,7 @@ func (t *serviceIngressResource) Upsert(key string, raw interface{}) error {
svc := t.Service
ingress, ok := raw.(*networkingv1.Ingress)
if !ok {
svc.Log.Warn("upsert got invalid type ing", "raw", raw)
svc.Log.Warn("upsert got invalid type", "raw", raw)
return nil
}

Expand Down
33 changes: 14 additions & 19 deletions control-plane/catalog/to-consul/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ func TestServiceResource_lbRegisterEndpoints(t *testing.T) {

node1, _ := createNodes(t, client)

// Insert the endpoint slices
// Insert the endpoint slice
_, err := client.DiscoveryV1().EndpointSlices(metav1.NamespaceDefault).Create(
context.Background(),
&discoveryv1.EndpointSlice{
Expand Down Expand Up @@ -794,8 +794,7 @@ func TestServiceResource_lbRegisterEndpoints(t *testing.T) {
},
},
},
metav1.CreateOptions{},
)
metav1.CreateOptions{})
require.NoError(t, err)

// Insert an LB service
Expand Down Expand Up @@ -911,7 +910,6 @@ func TestServiceResource_nodePort_singleEndpoint(t *testing.T) {
node1, _ := createNodes(t, client)

// Insert the endpoint slice

_, err := client.DiscoveryV1().EndpointSlices(metav1.NamespaceDefault).Create(
context.Background(),
&discoveryv1.EndpointSlice{
Expand Down Expand Up @@ -944,9 +942,7 @@ func TestServiceResource_nodePort_singleEndpoint(t *testing.T) {
},
},
},
metav1.CreateOptions{},
)

metav1.CreateOptions{})
require.NoError(t, err)

// Insert the service
Expand Down Expand Up @@ -1156,7 +1152,7 @@ func TestServiceResource_clusterIP(t *testing.T) {
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{})
require.NoError(t, err)

// Insert the endpoints
// Insert the endpoint slice
createEndpointSlice(t, client, "foo", metav1.NamespaceDefault)

// Verify what we got
Expand Down Expand Up @@ -1194,7 +1190,7 @@ func TestServiceResource_clusterIP_healthCheck(t *testing.T) {
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{})
require.NoError(t, err)

// Insert the endpoints
// Insert the endpoint slice
createEndpointSlice(t, client, "foo", metav1.NamespaceDefault)

// Verify what we got
Expand Down Expand Up @@ -1232,7 +1228,7 @@ func TestServiceResource_clusterIPPrefix(t *testing.T) {
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{})
require.NoError(t, err)

// Insert the endpoints
// Insert the endpoint slice
createEndpointSlice(t, client, "foo", metav1.NamespaceDefault)

// Verify what we got
Expand Down Expand Up @@ -1270,7 +1266,7 @@ func TestServiceResource_clusterIPAnnotatedPortName(t *testing.T) {
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{})
require.NoError(t, err)

// Insert the endpoints
// Insert the endpoint slice
createEndpointSlice(t, client, "foo", metav1.NamespaceDefault)

// Verify what we got
Expand Down Expand Up @@ -1308,7 +1304,7 @@ func TestServiceResource_clusterIPAnnotatedPortNumber(t *testing.T) {
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{})
require.NoError(t, err)

// Insert the endpoints
// Insert the endpoint slice
createEndpointSlice(t, client, "foo", metav1.NamespaceDefault)

// Verify what we got
Expand Down Expand Up @@ -1348,7 +1344,7 @@ func TestServiceResource_clusterIPUnnamedPorts(t *testing.T) {
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{})
require.NoError(t, err)

// Insert the endpoints
// Insert the endpoint slice
createEndpointSlice(t, client, "foo", metav1.NamespaceDefault)

// Verify what we got
Expand Down Expand Up @@ -1385,7 +1381,7 @@ func TestServiceResource_clusterIPSyncDisabled(t *testing.T) {
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{})
require.NoError(t, err)

// Insert the endpoints
// Insert the endpoint slice
createEndpointSlice(t, client, "foo", metav1.NamespaceDefault)

// Verify what we got
Expand Down Expand Up @@ -1415,7 +1411,7 @@ func TestServiceResource_clusterIPAllNamespaces(t *testing.T) {
_, err := client.CoreV1().Services(testNamespace).Create(context.Background(), svc, metav1.CreateOptions{})
require.NoError(t, err)

// Insert the endpoints
// Insert the endpoint slice
createEndpointSlice(t, client, "foo", testNamespace)

// Verify what we got
Expand Down Expand Up @@ -1456,7 +1452,7 @@ func TestServiceResource_clusterIPTargetPortNamed(t *testing.T) {
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{})
require.NoError(t, err)

// Insert the endpoints
// Insert the endpoint slice
createEndpointSlice(t, client, "foo", metav1.NamespaceDefault)

// Verify what we got
Expand Down Expand Up @@ -1492,7 +1488,7 @@ func TestServiceResource_targetRefInMeta(t *testing.T) {
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{})
require.NoError(t, err)

// Insert the endpoints
// Insert the endpoint slice
createEndpointSlice(t, client, "foo", metav1.NamespaceDefault)

// Verify what we got
Expand Down Expand Up @@ -2148,8 +2144,7 @@ func createEndpointSlice(t *testing.T, client *fake.Clientset, serviceName strin
},
},
},
metav1.CreateOptions{},
)
metav1.CreateOptions{})
require.NoError(t, err)
}

Expand Down

0 comments on commit c11b034

Please sign in to comment.