Skip to content

Commit

Permalink
refactor gc controller
Browse files Browse the repository at this point in the history
Signed-off-by: Zhiwei Yin <[email protected]>
  • Loading branch information
zhiweiyin318 committed Jul 24, 2023
1 parent 8d974c2 commit 8fbb20d
Show file tree
Hide file tree
Showing 10 changed files with 356 additions and 99 deletions.
12 changes: 12 additions & 0 deletions pkg/registration/helpers/testing/testinghelpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,18 @@ func NewRoleBinding(namespace, name string, finalizers []string, labels map[stri
return rolebinding
}

func NewClusterRole(name string, finalizers []string, labels map[string]string, terminated bool) *rbacv1.ClusterRole {
clusterRole := &rbacv1.ClusterRole{}
clusterRole.Name = name
clusterRole.Finalizers = finalizers
clusterRole.Labels = labels
if terminated {
now := metav1.Now()
clusterRole.DeletionTimestamp = &now
}
return clusterRole
}

func NewResourceList(cpu, mem int) corev1.ResourceList {
return corev1.ResourceList{
corev1.ResourceCPU: *resource.NewQuantity(int64(cpu), resource.DecimalExponent),
Expand Down
18 changes: 3 additions & 15 deletions pkg/registration/hub/clusterrole/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,24 +76,12 @@ func (c *clusterroleController) sync(ctx context.Context, syncCtx factory.SyncCo
return err
}

errs := []error{}
// Clean up managedcluser cluserroles if there are no managed clusters
// gc controller will handle the clusterroles cleanup
if len(managedClusters) == 0 {
results := resourceapply.DeleteAll(
ctx,
resourceapply.NewKubeClientHolder(c.kubeClient),
c.eventRecorder,
manifestFiles.ReadFile,
clusterRoleFiles...,
)
for _, result := range results {
if result.Error != nil {
errs = append(errs, fmt.Errorf("%q (%T): %v", result.File, result.Type, result.Error))
}
}
return operatorhelpers.NewMultiLineAggregate(errs)
return nil
}

errs := []error{}
// Make sure the managedcluser cluserroles are existed if there are clusters
results := c.applier.Apply(
ctx,
Expand Down
18 changes: 0 additions & 18 deletions pkg/registration/hub/clusterrole/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/openshift/library-go/pkg/operator/events/eventstesting"
"github.com/openshift/library-go/pkg/operator/resource/resourceapply"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
kubeinformers "k8s.io/client-go/informers"
kubefake "k8s.io/client-go/kubernetes/fake"
Expand Down Expand Up @@ -45,23 +44,6 @@ func TestSyncManagedClusterClusterRole(t *testing.T) {
}
},
},
{
name: "delete clusterroles",
clusters: []runtime.Object{},
clusterroles: []runtime.Object{
&rbacv1.ClusterRole{ObjectMeta: metav1.ObjectMeta{Name: "open-cluster-management:managedcluster:registration"}},
&rbacv1.ClusterRole{ObjectMeta: metav1.ObjectMeta{Name: "open-cluster-management:managedcluster:work"}},
},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "delete", "delete")
if actions[0].(clienttesting.DeleteActionImpl).Name != "open-cluster-management:managedcluster:registration" {
t.Errorf("expected registration clusterrole, but failed")
}
if actions[1].(clienttesting.DeleteActionImpl).Name != "open-cluster-management:managedcluster:work" {
t.Errorf("expected work clusterrole, but failed")
}
},
},
}

for _, c := range cases {
Expand Down
89 changes: 89 additions & 0 deletions pkg/registration/hub/gc/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package gc

import (
"context"
"time"

"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
corev1informers "k8s.io/client-go/informers/core/v1"
rbacv1client "k8s.io/client-go/kubernetes/typed/rbac/v1"
corelisters "k8s.io/client-go/listers/core/v1"
rbacv1listers "k8s.io/client-go/listers/rbac/v1"
"k8s.io/client-go/tools/cache"

informerv1 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1"
clusterv1listers "open-cluster-management.io/api/client/cluster/listers/cluster/v1"
worklister "open-cluster-management.io/api/client/work/listers/work/v1"

"open-cluster-management.io/ocm/pkg/common/queue"
)

// gcReconcile is an interface for reconcile cleanup logic.
type gcReconcile interface {
reconcile(ctx context.Context, clusterName string) (bool, error)
}

type GCController struct {
clusterRoleLister rbacv1listers.ClusterRoleLister
roleBindingLister rbacv1listers.RoleBindingLister
rbacClient rbacv1client.RbacV1Interface
clusterLister clusterv1listers.ManagedClusterLister
namespaceLister corelisters.NamespaceLister
manifestWorkLister worklister.ManifestWorkLister
eventRecorder events.Recorder
gcReconciles []gcReconcile
}

// NewGCController ensures the related resources are cleaned up after cluster is deleted
func NewGCController(
clusterRoleLister rbacv1listers.ClusterRoleLister,
roleBindingLister rbacv1listers.RoleBindingLister,
namespaceInformer corev1informers.NamespaceInformer,
clusterInformer informerv1.ManagedClusterInformer,
manifestWorkLister worklister.ManifestWorkLister,
rbacClient rbacv1client.RbacV1Interface,
eventRecorder events.Recorder,
) factory.Controller {

controller := &GCController{
gcReconciles: []gcReconcile{
newGCRoleBindingController(roleBindingLister, namespaceInformer, clusterInformer, manifestWorkLister, rbacClient, eventRecorder),
newGCClusterRoleController(clusterRoleLister, clusterInformer, manifestWorkLister, rbacClient, eventRecorder),
},
}
return factory.New().
WithInformersQueueKeysFunc(queue.QueueKeyByMetaNamespaceName, clusterInformer.Informer(), namespaceInformer.Informer()).
WithSync(controller.sync).ToController("GCController", eventRecorder)
}

func (r *GCController) sync(ctx context.Context, controllerContext factory.SyncContext) error {
key := controllerContext.QueueKey()
if key == "" {
return nil
}

_, clusterName, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return nil
}

var errs []error
var requeue = false
for _, reconciler := range r.gcReconciles {
requeue, err = reconciler.reconcile(ctx, clusterName)
if err != nil {
errs = append(errs, err)
}
}
if len(errs) != 0 {
return utilerrors.NewAggregate(errs)
}

if requeue {
controllerContext.Queue().AddAfter(clusterName, 10*time.Second)
}

return nil
}
93 changes: 93 additions & 0 deletions pkg/registration/hub/gc/gc_clusterrole.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package gc

import (
"context"

"github.com/openshift/library-go/pkg/operator/events"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
rbacv1client "k8s.io/client-go/kubernetes/typed/rbac/v1"
rbacv1listers "k8s.io/client-go/listers/rbac/v1"
"k8s.io/klog/v2"

informerv1 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1"
clusterv1listers "open-cluster-management.io/api/client/cluster/listers/cluster/v1"
worklister "open-cluster-management.io/api/client/work/listers/work/v1"
)

const (
registrationClusterRole = "open-cluster-management:managedcluster:registration"
workClusterRole = "open-cluster-management:managedcluster:work"
)

type gcClusterRoleController struct {
clusterRoleLister rbacv1listers.ClusterRoleLister
rbacClient rbacv1client.RbacV1Interface
clusterLister clusterv1listers.ManagedClusterLister
manifestWorkLister worklister.ManifestWorkLister
eventRecorder events.Recorder
}

// newGCClusterRoleController ensures all managedCluster and manifestWorks are deleted before the clusterRoles for registration
// and work agent.
func newGCClusterRoleController(
clusterRoleLister rbacv1listers.ClusterRoleLister,
clusterInformer informerv1.ManagedClusterInformer,
manifestWorkLister worklister.ManifestWorkLister,
rbacClient rbacv1client.RbacV1Interface,
eventRecorder events.Recorder,
) *gcClusterRoleController {

return &gcClusterRoleController{
clusterRoleLister: clusterRoleLister,
clusterLister: clusterInformer.Lister(),
manifestWorkLister: manifestWorkLister,
rbacClient: rbacClient,
eventRecorder: eventRecorder,
}

}

func (r *gcClusterRoleController) reconcile(ctx context.Context, clusterName string) (bool, error) {
clusters, err := r.clusterLister.List(labels.Everything())
if err != nil && !errors.IsNotFound(err) {
return false, err
}
if len(clusters) != 0 {
return false, nil
}

_, err = r.clusterRoleLister.Get(registrationClusterRole)
switch {
case err == nil:
err = r.rbacClient.ClusterRoles().Delete(ctx, registrationClusterRole, metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
return false, err
}
case err != nil && !errors.IsNotFound(err):
return false, err
}

works, err := r.manifestWorkLister.List(labels.Everything())
if err != nil && !errors.IsNotFound(err) {
return false, err
}
if len(works) != 0 {
klog.Warningf("There is no cluster, waiting for all manifestWorks to be cleaned up.")
return true, nil
}

_, err = r.clusterRoleLister.Get(workClusterRole)
switch {
case err == nil:
err = r.rbacClient.ClusterRoles().Delete(ctx, workClusterRole, metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
return false, err
}
case err != nil && !errors.IsNotFound(err):
return false, err
}

return false, nil
}
113 changes: 113 additions & 0 deletions pkg/registration/hub/gc/gc_clusterrole_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package gc

import (
"context"
"testing"
"time"

"github.com/openshift/library-go/pkg/operator/events"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/runtime"
kubeinformers "k8s.io/client-go/informers"
fakeclient "k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"

fakeclusterclient "open-cluster-management.io/api/client/cluster/clientset/versioned/fake"
clusterinformers "open-cluster-management.io/api/client/cluster/informers/externalversions"
fakeworkclient "open-cluster-management.io/api/client/work/clientset/versioned/fake"
workinformers "open-cluster-management.io/api/client/work/informers/externalversions"
clusterv1 "open-cluster-management.io/api/cluster/v1"

testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
testinghelpers "open-cluster-management.io/ocm/pkg/registration/helpers/testing"
)

func TestClusterRoleReconcile(t *testing.T) {
cases := []struct {
name string
key string
clusters []runtime.Object
works []runtime.Object
expectedRequeue bool
validateActions func(t *testing.T, actions []clienttesting.Action)
}{
{
name: "not all clusters are deleted",
key: testinghelpers.TestManagedClusterName,
clusters: []runtime.Object{testinghelpers.NewManagedCluster()},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertNoActions(t, actions)
},
},
{
name: "There is no cluster but still have manifestWorks",
key: testinghelpers.TestManagedClusterName,
works: []runtime.Object{testinghelpers.NewManifestWork(testinghelpers.TestManagedClusterName, "work1", []string{manifestWorkFinalizer}, nil)},
expectedRequeue: true,
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "delete")
},
},
{
name: "There is no cluster and manifestWorks",
key: testinghelpers.TestManagedClusterName,
expectedRequeue: false,
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "delete", "delete")
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
kubeClient := fakeclient.NewSimpleClientset()
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Minute*10)

clusterRoleStore := kubeInformerFactory.Rbac().V1().ClusterRoles().Informer().GetStore()
if err := clusterRoleStore.Add(testinghelpers.NewClusterRole(registrationClusterRole,
[]string{}, map[string]string{clusterv1.ClusterNameLabelKey: ""}, false)); err != nil {
t.Fatal(err)
}
if err := clusterRoleStore.Add(testinghelpers.NewClusterRole(workClusterRole,
[]string{}, map[string]string{clusterv1.ClusterNameLabelKey: ""}, false)); err != nil {
t.Fatal(err)
}

clusterClient := fakeclusterclient.NewSimpleClientset()
clusterInformerFactory := clusterinformers.NewSharedInformerFactory(clusterClient, time.Minute*10)
clusterStore := clusterInformerFactory.Cluster().V1().ManagedClusters().Informer().GetStore()
for _, cluster := range c.clusters {
if err := clusterStore.Add(cluster); err != nil {
t.Fatal(err)
}
}
workClient := fakeworkclient.NewSimpleClientset()
workInformerFactory := workinformers.NewSharedInformerFactory(workClient, 5*time.Minute)
workStore := workInformerFactory.Work().V1().ManifestWorks().Informer().GetStore()
for _, work := range c.works {
if err := workStore.Add(work); err != nil {
t.Fatal(err)
}
}

_ = newGCClusterRoleController(
kubeInformerFactory.Rbac().V1().ClusterRoles().Lister(),
clusterInformerFactory.Cluster().V1().ManagedClusters(),
workInformerFactory.Work().V1().ManifestWorks().Lister(),
kubeClient.RbacV1(),
events.NewInMemoryRecorder(""),
)

ctrl := &gcClusterRoleController{
clusterRoleLister: kubeInformerFactory.Rbac().V1().ClusterRoles().Lister(),
clusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(),
manifestWorkLister: workInformerFactory.Work().V1().ManifestWorks().Lister(),
rbacClient: kubeClient.RbacV1(),
eventRecorder: events.NewInMemoryRecorder(""),
}
requeue, err := ctrl.reconcile(context.TODO(), c.key)
testingcommon.AssertError(t, err, "")
assert.Equal(t, requeue, c.expectedRequeue)
c.validateActions(t, kubeClient.Actions())
})
}
}
Loading

0 comments on commit 8fbb20d

Please sign in to comment.