Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: introduce CacheManager #2785

Merged
merged 24 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions pkg/controller/config/config_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/open-policy-agent/gatekeeper/v3/pkg/metrics"
"github.com/open-policy-agent/gatekeeper/v3/pkg/mutation"
"github.com/open-policy-agent/gatekeeper/v3/pkg/readiness"
syncutil "github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil"
cm "github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil/cachemanager"
"github.com/open-policy-agent/gatekeeper/v3/pkg/target"
"github.com/open-policy-agent/gatekeeper/v3/pkg/watch"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -109,16 +111,14 @@ func (a *Adder) InjectWatchSet(watchSet *watch.Set) {
// events is the channel from which sync controller will receive the events
// regEvents is the channel registered by Registrar to put the events in
// events and regEvents point to same event channel except for testing.
func newReconciler(mgr manager.Manager, opa syncc.OpaDataClient, wm *watch.Manager, cs *watch.ControllerSwitch, tracker *readiness.Tracker, processExcluder *process.Excluder, events <-chan event.GenericEvent, watchSet *watch.Set, regEvents chan<- event.GenericEvent) (*ReconcileConfig, error) {
filteredOpa := syncc.NewFilteredOpaDataClient(opa, watchSet)
syncMetricsCache := syncc.NewMetricsCache()
func newReconciler(mgr manager.Manager, opa syncutil.OpaDataClient, wm *watch.Manager, cs *watch.ControllerSwitch, tracker *readiness.Tracker, processExcluder *process.Excluder, events <-chan event.GenericEvent, watchSet *watch.Set, regEvents chan<- event.GenericEvent) (*ReconcileConfig, error) {
filteredOpa := syncutil.NewFilteredOpaDataClient(opa, watchSet)
syncMetricsCache := syncutil.NewMetricsCache()
cm := cm.NewCacheManager(opa, syncMetricsCache, tracker, processExcluder)

syncAdder := syncc.Adder{
Opa: filteredOpa,
Events: events,
MetricsCache: syncMetricsCache,
Tracker: tracker,
ProcessExcluder: processExcluder,
Events: events,
CacheManager: cm,
}
// Create subordinate controller - we will feed it events dynamically via watch
if err := syncAdder.Add(mgr); err != nil {
Expand Down Expand Up @@ -176,8 +176,8 @@ type ReconcileConfig struct {
statusClient client.StatusClient

scheme *runtime.Scheme
opa syncc.OpaDataClient
syncMetricsCache *syncc.MetricsCache
opa syncutil.OpaDataClient
syncMetricsCache *syncutil.MetricsCache
cs *watch.ControllerSwitch
watcher *watch.Registrar

Expand Down Expand Up @@ -327,7 +327,7 @@ func (r *ReconcileConfig) wipeCacheIfNeeded(ctx context.Context) error {

// reset sync cache before sending the metric
r.syncMetricsCache.ResetCache()
r.syncMetricsCache.ReportSync(&syncc.Reporter{})
r.syncMetricsCache.ReportSync()

r.needsWipe = false
}
Expand All @@ -352,10 +352,10 @@ func (r *ReconcileConfig) replayData(ctx context.Context) error {
return fmt.Errorf("replaying data for %+v: %w", gvk, err)
}

defer r.syncMetricsCache.ReportSync(&syncc.Reporter{})
defer r.syncMetricsCache.ReportSync()

for i := range u.Items {
syncKey := r.syncMetricsCache.GetSyncKey(u.Items[i].GetNamespace(), u.Items[i].GetName())
syncKey := syncutil.GetKeyForSyncMetrics(u.Items[i].GetNamespace(), u.Items[i].GetName())

isExcludedNamespace, err := r.skipExcludedNamespace(&u.Items[i])
if err != nil {
Expand All @@ -367,14 +367,14 @@ func (r *ReconcileConfig) replayData(ctx context.Context) error {
}

if _, err := r.opa.AddData(ctx, &u.Items[i]); err != nil {
r.syncMetricsCache.AddObject(syncKey, syncc.Tags{
r.syncMetricsCache.AddObject(syncKey, syncutil.Tags{
Kind: u.Items[i].GetKind(),
Status: metrics.ErrorStatus,
})
return fmt.Errorf("adding data for %+v: %w", gvk, err)
}

r.syncMetricsCache.AddObject(syncKey, syncc.Tags{
r.syncMetricsCache.AddObject(syncKey, syncutil.Tags{
Kind: u.Items[i].GetKind(),
Status: metrics.ActiveStatus,
})
Expand Down
26 changes: 13 additions & 13 deletions pkg/controller/config/config_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ func TestConfig_CacheContents(t *testing.T) {
mgr, wm := setupManager(t)
c := testclient.NewRetryClient(mgr.GetClient())

opaClient := &fakeOpa{}
opaClient := &fakes.FakeOpa{}
cs := watch.NewSwitch()
tracker, err := readiness.SetupTracker(mgr, false, false, false)
if err != nil {
Expand Down Expand Up @@ -503,9 +503,9 @@ func TestConfig_CacheContents(t *testing.T) {
}
}()

expected := map[opaKey]interface{}{
{gvk: nsGVK, key: "default"}: nil,
{gvk: configMapGVK, key: "default/config-test-1"}: nil,
expected := map[fakes.OpaKey]interface{}{
{Gvk: nsGVK, Key: "default"}: nil,
{Gvk: configMapGVK, Key: "default/config-test-1"}: nil,
// kube-system namespace is being excluded, it should not be in opa cache
}
g.Eventually(func() bool {
Expand Down Expand Up @@ -535,20 +535,20 @@ func TestConfig_CacheContents(t *testing.T) {

// Expect our configMap to return at some point
// TODO: In the future it will remain instead of having to repopulate.
expected = map[opaKey]interface{}{
expected = map[fakes.OpaKey]interface{}{
{
gvk: configMapGVK,
key: "default/config-test-1",
Gvk: configMapGVK,
Key: "default/config-test-1",
}: nil,
}
g.Eventually(func() bool {
return opaClient.Contains(expected)
}, 10*time.Second).Should(gomega.BeTrue(), "waiting for ConfigMap to repopulate in cache")

expected = map[opaKey]interface{}{
expected = map[fakes.OpaKey]interface{}{
{
gvk: configMapGVK,
key: "kube-system/config-test-2",
Gvk: configMapGVK,
Key: "kube-system/config-test-2",
}: nil,
}
g.Eventually(func() bool {
Expand Down Expand Up @@ -590,7 +590,7 @@ func TestConfig_Retries(t *testing.T) {
mgr, wm := setupManager(t)
c := testclient.NewRetryClient(mgr.GetClient())

opaClient := &fakeOpa{}
opaClient := &fakes.FakeOpa{}
cs := watch.NewSwitch()
tracker, err := readiness.SetupTracker(mgr, false, false, false)
if err != nil {
Expand Down Expand Up @@ -665,8 +665,8 @@ func TestConfig_Retries(t *testing.T) {
}
}()

expected := map[opaKey]interface{}{
{gvk: configMapGVK, key: "default/config-test-1"}: nil,
expected := map[fakes.OpaKey]interface{}{
{Gvk: configMapGVK, Key: "default/config-test-1"}: nil,
}
g.Eventually(func() bool {
return opaClient.Contains(expected)
Expand Down
100 changes: 0 additions & 100 deletions pkg/controller/config/fakes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,110 +17,10 @@ package config

import (
"context"
"fmt"
gosync "sync"

constraintTypes "github.com/open-policy-agent/frameworks/constraint/pkg/types"
"github.com/open-policy-agent/gatekeeper/v3/pkg/target"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type opaKey struct {
gvk schema.GroupVersionKind
key string
}

// fakeOpa is an OpaDataClient for testing.
type fakeOpa struct {
mu gosync.Mutex
data map[opaKey]interface{}
}

// keyFor returns an opaKey for the provided resource.
// Returns error if the resource is not a runtime.Object w/ metadata.
func (f *fakeOpa) keyFor(obj interface{}) (opaKey, error) {
o, ok := obj.(client.Object)
if !ok {
return opaKey{}, fmt.Errorf("expected runtime.Object, got: %T", obj)
}
gvk := o.GetObjectKind().GroupVersionKind()
ns := o.GetNamespace()
if ns == "" {
return opaKey{gvk: gvk, key: o.GetName()}, nil
}

return opaKey{gvk: gvk, key: fmt.Sprintf("%s/%s", ns, o.GetName())}, nil
}

func (f *fakeOpa) AddData(ctx context.Context, data interface{}) (*constraintTypes.Responses, error) {
f.mu.Lock()
defer f.mu.Unlock()

key, err := f.keyFor(data)
if err != nil {
return nil, err
}

if f.data == nil {
f.data = make(map[opaKey]interface{})
}

f.data[key] = data
return &constraintTypes.Responses{}, nil
}

func (f *fakeOpa) RemoveData(ctx context.Context, data interface{}) (*constraintTypes.Responses, error) {
f.mu.Lock()
defer f.mu.Unlock()

if target.IsWipeData(data) {
f.data = make(map[opaKey]interface{})
return &constraintTypes.Responses{}, nil
}

key, err := f.keyFor(data)
if err != nil {
return nil, err
}

delete(f.data, key)
return &constraintTypes.Responses{}, nil
}

// Contains returns true if all expected resources are in the cache.
func (f *fakeOpa) Contains(expected map[opaKey]interface{}) bool {
f.mu.Lock()
defer f.mu.Unlock()

for k := range expected {
if _, ok := f.data[k]; !ok {
return false
}
}
return true
}

// HasGVK returns true if the cache has any data of the requested kind.
func (f *fakeOpa) HasGVK(gvk schema.GroupVersionKind) bool {
f.mu.Lock()
defer f.mu.Unlock()

for k := range f.data {
if k.gvk == gvk {
return true
}
}
return false
}

// Len returns the number of items in the cache.
func (f *fakeOpa) Len() int {
f.mu.Lock()
defer f.mu.Unlock()
return len(f.data)
}

// hookReader is a client.Reader with overrideable methods.
type hookReader struct {
client.Reader
Expand Down
1 change: 0 additions & 1 deletion pkg/controller/constraint/constraint_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,6 @@ func (r *ReconcileConstraint) cacheConstraint(ctx context.Context, instance *uns

// Track for readiness
t.Observe(instance)
log.Info("[readiness] observed Constraint", "name", instance.GetName())

return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,6 @@ func (r *ReconcileConstraintTemplate) handleUpdate(
// Mark for readiness tracking
t := r.tracker.For(gvkConstraintTemplate)
t.Observe(unversionedCT)
logger.Info("[readiness] observed ConstraintTemplate", "name", unversionedCT.GetName())

var newCRD *apiextensionsv1.CustomResourceDefinition
if currentCRD == nil {
Expand Down
1 change: 0 additions & 1 deletion pkg/controller/expansion/expansion_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) (

upsertErr := r.system.UpsertTemplate(et)
if upsertErr == nil {
log.Info("[readiness] observed ExpansionTemplate", "template name", et.GetName())
r.getTracker().Observe(versionedET)
r.registry.add(request.NamespacedName, metrics.ActiveStatus)
} else {
Expand Down
Loading