Skip to content

Commit

Permalink
[NET-7656] Add GatewayClassConfig watch for MeshGateway controller (#…
Browse files Browse the repository at this point in the history
…3537)

* Add GatewayClass[Config] watches for MeshGateway controller

* Update merge logic for deployment + service

* Add test coverage for MergeDeployment

* Add test coverage for MergeService

* Copy over owner references to new Service + Deployment
  • Loading branch information
nathancoleman committed Feb 6, 2024
1 parent be8ef00 commit 95dd321
Show file tree
Hide file tree
Showing 5 changed files with 352 additions and 84 deletions.
191 changes: 121 additions & 70 deletions control-plane/controllers/resources/mesh_gateway_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/equality"
k8serr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

meshv2beta1 "github.com/hashicorp/consul-k8s/control-plane/api/mesh/v2beta1"
"github.com/hashicorp/consul-k8s/control-plane/gateways"
Expand Down Expand Up @@ -84,6 +87,49 @@ func (r *MeshGatewayController) SetupWithManager(mgr ctrl.Manager) error {
Owns(&rbacv1.RoleBinding{}).
Owns(&corev1.Service{}).
Owns(&corev1.ServiceAccount{}).
Watches(
source.NewKindWithCache(&meshv2beta1.GatewayClass{}, mgr.GetCache()),
handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request {
gateways, err := r.getGatewaysReferencingGatewayClass(context.Background(), o.(*meshv2beta1.GatewayClass))
if err != nil {
return nil
}

requests := make([]reconcile.Request, 0, len(gateways.Items))
for _, gateway := range gateways.Items {
requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{
Namespace: gateway.Namespace,
Name: gateway.Name,
}})
}

return requests
})).
Watches(
source.NewKindWithCache(&meshv2beta1.GatewayClassConfig{}, mgr.GetCache()),
handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request {
classes, err := r.getGatewayClassesReferencingGatewayClassConfig(context.Background(), o.(*meshv2beta1.GatewayClassConfig))
if err != nil {
return nil
}

var requests []reconcile.Request
for _, class := range classes.Items {
gateways, err := r.getGatewaysReferencingGatewayClass(context.Background(), class)
if err != nil {
continue
}

for _, gateway := range gateways.Items {
requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{
Namespace: gateway.Namespace,
Name: gateway.Name,
}})
}
}

return requests
})).
Complete(r)
}

Expand All @@ -96,7 +142,7 @@ func (r *MeshGatewayController) SetupWithManager(mgr ctrl.Manager) error {
// 4. Role
// 5. RoleBinding
func (r *MeshGatewayController) onCreateUpdate(ctx context.Context, req ctrl.Request, resource *meshv2beta1.MeshGateway) error {
// fetch gatewayclassconfig
// Fetch GatewayClassConfig for the gateway
gcc, err := r.getGatewayClassConfigForGateway(ctx, resource)
if err != nil {
r.Log.Error(err, "unable to get gatewayclassconfig for gateway: %s gatewayclass: %s", resource.Name, resource.Spec.GatewayClassName)
Expand All @@ -105,77 +151,77 @@ func (r *MeshGatewayController) onCreateUpdate(ctx context.Context, req ctrl.Req

builder := gateways.NewMeshGatewayBuilder(resource, r.GatewayConfig, gcc)

// Create ServiceAccount
desiredAccount := builder.ServiceAccount()
existingAccount := &corev1.ServiceAccount{ObjectMeta: metav1.ObjectMeta{Namespace: desiredAccount.Namespace, Name: desiredAccount.Name}}

upsertOp := func(ctx context.Context, _, object client.Object) error {
_, err := controllerutil.CreateOrUpdate(ctx, r.Client, object, func() error { return nil })
return err
}

err = r.opIfNewOrOwned(ctx, resource, &corev1.ServiceAccount{}, builder.ServiceAccount(), upsertOp)
err = r.opIfNewOrOwned(ctx, resource, existingAccount, desiredAccount, upsertOp)
if err != nil {
return fmt.Errorf("unable to create service account: %w", err)
}

// Create Role
desiredRole := builder.Role()
existingRole := &rbacv1.Role{ObjectMeta: metav1.ObjectMeta{Namespace: desiredRole.Namespace, Name: desiredRole.Name}}

err = r.opIfNewOrOwned(ctx, resource, &rbacv1.Role{}, builder.Role(), upsertOp)
err = r.opIfNewOrOwned(ctx, resource, existingRole, desiredRole, upsertOp)
if err != nil {
return fmt.Errorf("unable to create role: %w", err)
}

// Create RoleBinding
desiredBinding := builder.RoleBinding()
existingBinding := &rbacv1.RoleBinding{ObjectMeta: metav1.ObjectMeta{Namespace: desiredBinding.Namespace, Name: desiredBinding.Name}}

err = r.opIfNewOrOwned(ctx, resource, &rbacv1.RoleBinding{}, builder.RoleBinding(), upsertOp)
err = r.opIfNewOrOwned(ctx, resource, existingBinding, desiredBinding, upsertOp)
if err != nil {
return fmt.Errorf("unable to create role binding: %w", err)
}

// Create Service
desiredService := builder.Service()
existingService := &corev1.Service{ObjectMeta: metav1.ObjectMeta{Namespace: desiredService.Namespace, Name: desiredService.Name}}

mergeServiceOp := func(ctx context.Context, existingObject, object client.Object) error {
existingService, ok := existingObject.(*corev1.Service)
if !ok && existingService != nil {
return fmt.Errorf("unable to infer existing service type")
}
builtService, ok := object.(*corev1.Service)
if !ok {
return fmt.Errorf("unable to infer built service type")
}

mergedService := mergeService(existingService, builtService)
mergeServiceOp := func(ctx context.Context, existingObj, desiredObj client.Object) error {
existing := existingObj.(*corev1.Service)
desired := desiredObj.(*corev1.Service)

_, err := controllerutil.CreateOrUpdate(ctx, r.Client, mergedService, func() error { return nil })
_, err := controllerutil.CreateOrUpdate(ctx, r.Client, existing, func() error {
gateways.MergeService(existing, desired)
return nil
})
return err
}

err = r.opIfNewOrOwned(ctx, resource, &corev1.Service{}, builder.Service(), mergeServiceOp)
err = r.opIfNewOrOwned(ctx, resource, existingService, desiredService, mergeServiceOp)
if err != nil {
return fmt.Errorf("unable to create service: %w", err)
}

// Create deployment

mergeDeploymentOp := func(ctx context.Context, existingObject, object client.Object) error {
existingDeployment, ok := existingObject.(*appsv1.Deployment)
if !ok && existingDeployment != nil {
return fmt.Errorf("unable to infer existing deployment type")
}
builtDeployment, ok := object.(*appsv1.Deployment)
if !ok {
return fmt.Errorf("unable to infer built deployment type")
}
// Create Deployment
desiredDeployment, err := builder.Deployment()
if err != nil {
return fmt.Errorf("unable to create deployment: %w", err)
}
existingDeployment := &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Namespace: desiredDeployment.Namespace, Name: desiredDeployment.Name}}

mergedDeployment := builder.MergeDeployments(gcc, existingDeployment, builtDeployment)
mergeDeploymentOp := func(ctx context.Context, existingObj, desiredObj client.Object) error {
existing := existingObj.(*appsv1.Deployment)
desired := desiredObj.(*appsv1.Deployment)

_, err := controllerutil.CreateOrUpdate(ctx, r.Client, mergedDeployment, func() error { return nil })
_, err = controllerutil.CreateOrUpdate(ctx, r.Client, existing, func() error {
gateways.MergeDeployment(existing, desired)
return nil
})
return err
}

builtDeployment, err := builder.Deployment()
if err != nil {
return fmt.Errorf("unable to build deployment: %w", err)
}

err = r.opIfNewOrOwned(ctx, resource, &appsv1.Deployment{}, builtDeployment, mergeDeploymentOp)
err = r.opIfNewOrOwned(ctx, resource, existingDeployment, desiredDeployment, mergeDeploymentOp)
if err != nil {
return fmt.Errorf("unable to create deployment: %w", err)
}
Expand All @@ -199,24 +245,24 @@ func (r *MeshGatewayController) onDelete(ctx context.Context, req ctrl.Request,
// The existing and new object are available in case any merging needs
// to occur, such as unknown annotations and values from the existing object
// that need to be carried forward onto the new object.
type ownedObjectOp func(ctx context.Context, existingObject client.Object, newObject client.Object) error
type ownedObjectOp func(ctx context.Context, existing, desired client.Object) error

// opIfNewOrOwned runs a given ownedObjectOp to create, update, or delete a resource.
// The purpose of opIfNewOrOwned is to ensure that we aren't updating or deleting a
// resource that was not created by us. If this scenario is encountered, we error.
func (r *MeshGatewayController) opIfNewOrOwned(ctx context.Context, gateway *meshv2beta1.MeshGateway, scanTarget, writeSource client.Object, op ownedObjectOp) error {
func (r *MeshGatewayController) opIfNewOrOwned(ctx context.Context, gateway *meshv2beta1.MeshGateway, existing, desired client.Object, op ownedObjectOp) error {
// Ensure owner reference is always set on objects that we write
if err := ctrl.SetControllerReference(gateway, writeSource, r.Client.Scheme()); err != nil {
if err := ctrl.SetControllerReference(gateway, desired, r.Client.Scheme()); err != nil {
return err
}

key := client.ObjectKey{
Namespace: writeSource.GetNamespace(),
Name: writeSource.GetName(),
Namespace: existing.GetNamespace(),
Name: existing.GetName(),
}

exists := false
if err := r.Get(ctx, key, scanTarget); err != nil {
if err := r.Get(ctx, key, existing); err != nil {
// We failed to fetch the object in a way that doesn't tell us about its existence
if !k8serr.IsNotFound(err) {
return err
Expand All @@ -228,12 +274,12 @@ func (r *MeshGatewayController) opIfNewOrOwned(ctx context.Context, gateway *mes

// None exists, so we need only execute the operation
if !exists {
return op(ctx, nil, writeSource)
return op(ctx, existing, desired)
}

// Ensure the existing object was put there by us so that we don't overwrite random objects
owned := false
for _, reference := range scanTarget.GetOwnerReferences() {
for _, reference := range existing.GetOwnerReferences() {
if reference.UID == gateway.GetUID() && reference.Name == gateway.GetName() {
owned = true
break
Expand All @@ -242,7 +288,7 @@ func (r *MeshGatewayController) opIfNewOrOwned(ctx context.Context, gateway *mes
if !owned {
return errResourceNotOwned
}
return op(ctx, scanTarget, writeSource)
return op(ctx, existing, desired)
}

func (r *MeshGatewayController) getGatewayClassConfigForGateway(ctx context.Context, gateway *meshv2beta1.MeshGateway) (*meshv2beta1.GatewayClassConfig, error) {
Expand Down Expand Up @@ -288,39 +334,44 @@ func (r *MeshGatewayController) getGatewayClassForGateway(ctx context.Context, g
return &gatewayClass, nil
}

func areServicesEqual(a, b *corev1.Service) bool {
// If either service "a" or "b" is nil, don't want to try and merge the nil service
if a == nil || b == nil {
return true
}

if !equality.Semantic.DeepEqual(a.Annotations, b.Annotations) {
return false
// getGatewayClassesReferencingGatewayClassConfig queries all GatewayClass resources in the
// cluster and returns any that reference the given GatewayClassConfig.
func (r *MeshGatewayController) getGatewayClassesReferencingGatewayClassConfig(ctx context.Context, config *meshv2beta1.GatewayClassConfig) (*meshv2beta1.GatewayClassList, error) {
if config == nil {
return nil, nil
}

if len(b.Spec.Ports) != len(a.Spec.Ports) {
return false
allClasses := &meshv2beta1.GatewayClassList{}
if err := r.Client.List(ctx, allClasses); err != nil {
return nil, client.IgnoreNotFound(err)
}

for i, port := range a.Spec.Ports {
otherPort := b.Spec.Ports[i]
if port.Port != otherPort.Port || port.Protocol != otherPort.Protocol {
return false
matchingClasses := &meshv2beta1.GatewayClassList{}
for _, class := range allClasses.Items {
if class.Spec.ParametersRef != nil && class.Spec.ParametersRef.Name == config.Name {
matchingClasses.Items = append(matchingClasses.Items, class)
}
}
return true
return matchingClasses, nil
}

// mergeService is used to keep annotations and ports from the `from` Service
// to the `to` service. This prevents an infinite reconciliation loop when
// Kubernetes adds this configuration back in.
func mergeService(from, to *corev1.Service) *corev1.Service {
if areServicesEqual(from, to) {
return to
// getGatewaysReferencingGatewayClass queries all MeshGateway resources in the cluster
// and returns any that reference the given GatewayClass.
func (r *MeshGatewayController) getGatewaysReferencingGatewayClass(ctx context.Context, class *meshv2beta1.GatewayClass) (*meshv2beta1.MeshGatewayList, error) {
if class == nil {
return nil, nil
}

to.Annotations = from.Annotations
to.Spec.Ports = from.Spec.Ports
allGateways := &meshv2beta1.MeshGatewayList{}
if err := r.Client.List(ctx, allGateways); err != nil {
return nil, client.IgnoreNotFound(err)
}

return to
matchingGateways := &meshv2beta1.MeshGatewayList{}
for _, gateway := range allGateways.Items {
if gateway.Spec.GatewayClassName == class.Name {
matchingGateways.Items = append(matchingGateways.Items, gateway)
}
}
return matchingGateways, nil
}
48 changes: 35 additions & 13 deletions control-plane/gateways/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,19 +109,10 @@ func (b *meshGatewayBuilder) deploymentSpec() (*appsv1.DeploymentSpec, error) {
}, nil
}

func (b *meshGatewayBuilder) MergeDeployments(gcc *meshv2beta1.GatewayClassConfig, old, new *appsv1.Deployment) *appsv1.Deployment {
if old == nil {
return new
}
if !compareDeployments(old, new) {
old.Spec.Template = new.Spec.Template
new.Spec.Replicas = deploymentReplicaCount(nil, old.Spec.Replicas)
}

return new
}

func compareDeployments(a, b *appsv1.Deployment) bool {
// areDeploymentsEqual determines whether two Deployments are the same in
// the ways that we care about. This specifically ignores valid out-of-band
// changes such as initContainer injection.
func areDeploymentsEqual(a, b *appsv1.Deployment) bool {
// since K8s adds a bunch of defaults when we create a deployment, check that
// they don't differ by the things that we may actually change, namely container
// ports
Expand Down Expand Up @@ -183,3 +174,34 @@ func deploymentReplicaCount(replicas *meshv2beta1.GatewayClassReplicasConfig, cu
// otherwise use the global default
return pointer.Int32(globalDefaultInstances)
}

// MergeDeployment is used to update an appsv1.Deployment without overwriting any
// existing annotations or labels that were placed there by other vendors.
//
// based on https://github.com/kubernetes-sigs/controller-runtime/blob/4000e996a202917ad7d40f02ed8a2079a9ce25e9/pkg/controller/controllerutil/example_test.go
func MergeDeployment(existing, desired *appsv1.Deployment) {
// Only overwrite fields if the Deployment doesn't exist yet
if existing.ObjectMeta.CreationTimestamp.IsZero() {
existing.ObjectMeta.OwnerReferences = desired.ObjectMeta.OwnerReferences
existing.Spec = desired.Spec
existing.Annotations = desired.Annotations
existing.Labels = desired.Labels
return
}

// Make sure we don't reconcile forever by overwriting valid out-of-band
// changes such as init container injection. If the deployments are
// sufficiently equal, we only update the annotations.
if !areDeploymentsEqual(existing, desired) {
desired.Spec.Replicas = deploymentReplicaCount(nil, existing.Spec.Replicas)
existing.Spec = desired.Spec
}

// If the Deployment already exists, add any desired annotations + labels to existing set
for k, v := range desired.ObjectMeta.Annotations {
existing.ObjectMeta.Annotations[k] = v
}
for k, v := range desired.ObjectMeta.Labels {
existing.ObjectMeta.Labels[k] = v
}
}
Loading

0 comments on commit 95dd321

Please sign in to comment.