From 3edd5beb899ea4718c3d2da67a5e7adc3550b247 Mon Sep 17 00:00:00 2001
From: Olivier Cazade <ocazade@redhat.com>
Date: Tue, 10 May 2022 17:34:57 +0200
Subject: [PATCH] Refactoring to split cluster role in two (ingestor and
 transformer)

---
 controllers/flowlogspipeline/flp_objects.go   |  51 ++++++--
 .../flowlogspipeline/flp_reconciler.go        | 112 ++++++++++++------
 2 files changed, 113 insertions(+), 50 deletions(-)

diff --git a/controllers/flowlogspipeline/flp_objects.go b/controllers/flowlogspipeline/flp_objects.go
index 0bb267160..44c5a8994 100644
--- a/controllers/flowlogspipeline/flp_objects.go
+++ b/controllers/flowlogspipeline/flp_objects.go
@@ -190,7 +190,7 @@ func (b *builder) podTemplate(configDigest string) corev1.PodTemplateSpec {
 				},
 			}},
 			Containers:         []corev1.Container{container},
-			ServiceAccountName: constants.FLPName,
+			ServiceAccountName: constants.FLPName + b.confKindSuffix,
 		},
 	}
 }
@@ -410,10 +410,10 @@ func buildAppLabel(confKind string) map[string]string {
 	}
 }
 
-func buildClusterRole() *rbacv1.ClusterRole {
+func buildClusterRoleIngester() *rbacv1.ClusterRole {
 	return &rbacv1.ClusterRole{
 		ObjectMeta: metav1.ObjectMeta{
-			Name:   constants.FLPName,
+			Name:   constants.FLPName + FlpConfSuffix[ConfKafkaIngester],
 			Labels: buildAppLabel(""),
 		},
 		Rules: []rbacv1.PolicyRule{{
@@ -437,31 +437,60 @@ func buildClusterRole() *rbacv1.ClusterRole {
 	}
 }
 
-func buildServiceAccount(ns string) *corev1.ServiceAccount {
+func buildClusterRoleTransformer() *rbacv1.ClusterRole {
+	return &rbacv1.ClusterRole{
+		ObjectMeta: metav1.ObjectMeta{
+			Name:   constants.FLPName + FlpConfSuffix[ConfKafkaTransformer],
+			Labels: buildAppLabel(""),
+		},
+		Rules: []rbacv1.PolicyRule{{
+			APIGroups: []string{""},
+			Verbs:     []string{"list", "get", "watch"},
+			Resources: []string{"pods", "services", "nodes"},
+		}, {
+			APIGroups: []string{"apps"},
+			Verbs:     []string{"list", "get", "watch"},
+			Resources: []string{"replicasets"},
+		}, {
+			APIGroups: []string{"autoscaling"},
+			Verbs:     []string{"create", "delete", "patch", "update", "get", "watch", "list"},
+			Resources: []string{"horizontalpodautoscalers"},
+		}, {
+			APIGroups:     []string{"security.openshift.io"},
+			Verbs:         []string{"use"},
+			Resources:     []string{"securitycontextconstraints"},
+			ResourceNames: []string{"hostnetwork"},
+		}},
+	}
+}
+
+func (b *builder) serviceAccount() *corev1.ServiceAccount {
 	return &corev1.ServiceAccount{
 		ObjectMeta: metav1.ObjectMeta{
-			Name:      constants.FLPName,
-			Namespace: ns,
+			Name:      constants.FLPName + b.confKindSuffix,
+			Namespace: b.namespace,
 			Labels:    buildAppLabel(""),
 		},
 	}
 }
 
-func buildClusterRoleBinding(ns string) *rbacv1.ClusterRoleBinding {
+func (b *builder) clusterRoleBinding(roleKind string) *rbacv1.ClusterRoleBinding {
+	//Adding role here to disembiguate between the deployment kind and the role binded
+	name := constants.FLPName + b.confKindSuffix + FlpConfSuffix[roleKind] + "role"
 	return &rbacv1.ClusterRoleBinding{
 		ObjectMeta: metav1.ObjectMeta{
-			Name:   constants.FLPName,
+			Name:   name,
 			Labels: buildAppLabel(""),
 		},
 		RoleRef: rbacv1.RoleRef{
 			APIGroup: "rbac.authorization.k8s.io",
 			Kind:     "ClusterRole",
-			Name:     constants.FLPName,
+			Name:     constants.FLPName + FlpConfSuffix[roleKind],
 		},
 		Subjects: []rbacv1.Subject{{
 			Kind:      "ServiceAccount",
-			Name:      constants.FLPName,
-			Namespace: ns,
+			Name:      constants.FLPName + b.confKindSuffix,
+			Namespace: b.namespace,
 		}},
 	}
 }
diff --git a/controllers/flowlogspipeline/flp_reconciler.go b/controllers/flowlogspipeline/flp_reconciler.go
index 1d73dd09b..4ad54ac5a 100644
--- a/controllers/flowlogspipeline/flp_reconciler.go
+++ b/controllers/flowlogspipeline/flp_reconciler.go
@@ -9,7 +9,7 @@ import (
 	appsv1 "k8s.io/api/apps/v1"
 	ascv2 "k8s.io/api/autoscaling/v2beta2"
 	corev1 "k8s.io/api/core/v1"
-	v1 "k8s.io/api/rbac/v1"
+	rbacv1 "k8s.io/api/rbac/v1"
 	"k8s.io/apimachinery/pkg/api/errors"
 	"k8s.io/apimachinery/pkg/types"
 
@@ -37,12 +37,14 @@ type singleDeploymentReconciler struct {
 }
 
 type ownedObjects struct {
-	deployment     *appsv1.Deployment
-	daemonSet      *appsv1.DaemonSet
-	service        *corev1.Service
-	hpa            *ascv2.HorizontalPodAutoscaler
-	serviceAccount *corev1.ServiceAccount
-	configMap      *corev1.ConfigMap
+	deployment             *appsv1.Deployment
+	daemonSet              *appsv1.DaemonSet
+	service                *corev1.Service
+	hpa                    *ascv2.HorizontalPodAutoscaler
+	serviceAccount         *corev1.ServiceAccount
+	configMap              *corev1.ConfigMap
+	roleBindingIngester    *rbacv1.ClusterRoleBinding
+	roleBindingTransformer *rbacv1.ClusterRoleBinding
 }
 
 func NewReconciler(cl reconcilers.ClientHelper, ns, prevNS string) FLPReconciler {
@@ -61,20 +63,29 @@ func NewReconciler(cl reconcilers.ClientHelper, ns, prevNS string) FLPReconciler
 
 func newSingleReconciler(cl reconcilers.ClientHelper, ns string, prevNS string, confKind string) singleDeploymentReconciler {
 	owned := ownedObjects{
-		deployment:     &appsv1.Deployment{},
-		daemonSet:      &appsv1.DaemonSet{},
-		service:        &corev1.Service{},
-		hpa:            &ascv2.HorizontalPodAutoscaler{},
-		serviceAccount: &corev1.ServiceAccount{},
-		configMap:      &corev1.ConfigMap{},
+		deployment:             &appsv1.Deployment{},
+		daemonSet:              &appsv1.DaemonSet{},
+		service:                &corev1.Service{},
+		hpa:                    &ascv2.HorizontalPodAutoscaler{},
+		serviceAccount:         &corev1.ServiceAccount{},
+		configMap:              &corev1.ConfigMap{},
+		roleBindingIngester:    &rbacv1.ClusterRoleBinding{},
+		roleBindingTransformer: &rbacv1.ClusterRoleBinding{},
 	}
 	nobjMngr := reconcilers.NewNamespacedObjectManager(cl, ns, prevNS)
 	nobjMngr.AddManagedObject(constants.FLPName+FlpConfSuffix[confKind], owned.deployment)
 	nobjMngr.AddManagedObject(constants.FLPName+FlpConfSuffix[confKind], owned.daemonSet)
 	nobjMngr.AddManagedObject(constants.FLPName+FlpConfSuffix[confKind], owned.service)
 	nobjMngr.AddManagedObject(constants.FLPName+FlpConfSuffix[confKind], owned.hpa)
+	nobjMngr.AddManagedObject(constants.FLPName+FlpConfSuffix[confKind], owned.serviceAccount)
 	nobjMngr.AddManagedObject(configMapName+FlpConfSuffix[confKind], owned.configMap)
 
+	if confKind == ConfSingle || confKind == ConfKafkaIngester {
+		nobjMngr.AddManagedObject(constants.FLPName+FlpConfSuffix[confKind]+FlpConfSuffix[ConfKafkaIngester]+"role", owned.roleBindingIngester)
+	}
+	if confKind == ConfSingle || confKind == ConfKafkaTransformer {
+		nobjMngr.AddManagedObject(constants.FLPName+FlpConfSuffix[confKind]+FlpConfSuffix[ConfKafkaTransformer]+"role", owned.roleBindingIngester)
+	}
 	return singleDeploymentReconciler{ClientHelper: cl, nobjMngr: nobjMngr, owned: owned, confKind: confKind}
 }
 
@@ -173,6 +184,10 @@ func (r *singleDeploymentReconciler) Reconcile(ctx context.Context, desired *flo
 		}
 	}
 
+	if err := r.reconcileAsServiceAccount(ctx, &builder); err != nil {
+		return err
+	}
+
 	switch desiredFLP.Kind {
 	case constants.DeploymentKind:
 		return r.reconcileAsDeployment(ctx, desiredFLP, &builder, configDigest)
@@ -252,59 +267,78 @@ func (r *singleDeploymentReconciler) reconcileAsDaemonSet(ctx context.Context, d
 	return nil
 }
 
-func (r *FLPReconciler) reconcilePermissions(ctx context.Context) error {
-	// Cluster role is only installed once
-	if err := r.reconcileClusterRole(ctx); err != nil {
-		return err
+func (r *singleDeploymentReconciler) reconcileAsServiceAccount(ctx context.Context, builder *builder) error {
+	if !r.nobjMngr.Exists(r.owned.serviceAccount) {
+		return r.CreateOwned(ctx, builder.serviceAccount())
+	} // We only configure name, update is not needed for now
+	if r.confKind == ConfKafkaIngester || r.confKind == ConfSingle {
+		if err := r.reconcileAsClusterRoleBinding(ctx, builder, ConfKafkaIngester); err != nil {
+			return err
+		}
 	}
-	// Service account has to be re-created when namespace changes (it is namespace-scoped)
-	if err := r.CreateOwned(ctx, buildServiceAccount(r.nobjMngr.Namespace)); err != nil {
-		return err
+	if r.confKind == ConfKafkaTransformer || r.confKind == ConfSingle {
+		if err := r.reconcileAsClusterRoleBinding(ctx, builder, ConfKafkaTransformer); err != nil {
+			return err
+		}
 	}
-	// Cluster role binding has to be updated when namespace changes (it is not namespace-scoped)
-	return r.reconcileClusterRoleBinding(ctx)
+	return nil
 }
 
-func (r *FLPReconciler) reconcileClusterRole(ctx context.Context) error {
-	desired := buildClusterRole()
-	actual := v1.ClusterRole{}
+func (r *singleDeploymentReconciler) reconcileAsClusterRoleBinding(ctx context.Context, builder *builder, roleKind string) error {
+	desired := builder.clusterRoleBinding(roleKind)
+	actual := rbacv1.ClusterRoleBinding{}
 	if err := r.Client.Get(ctx,
-		types.NamespacedName{Name: constants.FLPName},
+		types.NamespacedName{Name: desired.ObjectMeta.Name},
 		&actual,
 	); err != nil {
 		if errors.IsNotFound(err) {
 			return r.CreateOwned(ctx, desired)
 		}
-		return fmt.Errorf("can't reconcile %s ClusterRole: %w", constants.FLPName, err)
+		return fmt.Errorf("can't reconcile %s ClusterRoleBinding: %w", constants.FLPName, err)
 	}
-
 	if helper.IsSubSet(actual.Labels, desired.Labels) &&
-		reflect.DeepEqual(actual.Rules, desired.Rules) {
-		// cluster role already reconciled. Exiting
+		actual.RoleRef == desired.RoleRef &&
+		reflect.DeepEqual(actual.Subjects, desired.Subjects) {
+		if actual.RoleRef != desired.RoleRef {
+			//Roleref cannot be updated deleting and creating a new rolebinding
+			r.nobjMngr.TryDelete(ctx, &actual)
+			return r.CreateOwned(ctx, desired)
+		}
+		// cluster role binding already reconciled. Exiting
 		return nil
 	}
-
 	return r.UpdateOwned(ctx, &actual, desired)
 }
 
-func (r *FLPReconciler) reconcileClusterRoleBinding(ctx context.Context) error {
-	desired := buildClusterRoleBinding(r.nobjMngr.Namespace)
-	actual := v1.ClusterRoleBinding{}
+func (r *FLPReconciler) reconcilePermissions(ctx context.Context) error {
+	// Cluster role is only installed once
+	if err := r.reconcileClusterRole(ctx, buildClusterRoleIngester(), constants.FLPName+FlpConfSuffix[ConfKafkaIngester]); err != nil {
+		return err
+	}
+	if err := r.reconcileClusterRole(ctx, buildClusterRoleTransformer(), constants.FLPName+FlpConfSuffix[ConfKafkaTransformer]); err != nil {
+		return err
+	}
+	return nil
+}
+
+func (r *FLPReconciler) reconcileClusterRole(ctx context.Context, desired *rbacv1.ClusterRole, name string) error {
+	actual := rbacv1.ClusterRole{}
 	if err := r.Client.Get(ctx,
-		types.NamespacedName{Name: constants.FLPName},
+		types.NamespacedName{Name: name},
 		&actual,
 	); err != nil {
 		if errors.IsNotFound(err) {
 			return r.CreateOwned(ctx, desired)
 		}
-		return fmt.Errorf("can't reconcile %s ClusterRoleBinding: %w", constants.FLPName, err)
+		return fmt.Errorf("can't reconcile %s ClusterRole: %w", name, err)
 	}
+
 	if helper.IsSubSet(actual.Labels, desired.Labels) &&
-		actual.RoleRef == desired.RoleRef &&
-		reflect.DeepEqual(actual.Subjects, desired.Subjects) {
-		// cluster role binding already reconciled. Exiting
+		reflect.DeepEqual(actual.Rules, desired.Rules) {
+		// cluster role already reconciled. Exiting
 		return nil
 	}
+
 	return r.UpdateOwned(ctx, &actual, desired)
 }