Skip to content
This repository has been archived by the owner on Mar 5, 2024. It is now read-only.

Commit

Permalink
Use IndexerInformer rather than controller and queue (#51)
Browse files Browse the repository at this point in the history
This helps to simplify the implementation of the pod and namespace caches, as well as better handling errors from `cache.DeletedFinalStateUnknown` identified in #46 and more.
  • Loading branch information
pingles authored Apr 23, 2018
1 parent 0e23bcb commit 3de2fcb
Show file tree
Hide file tree
Showing 8 changed files with 247 additions and 173 deletions.
19 changes: 19 additions & 0 deletions pkg/k8s/listwatch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package k8s

import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)

const (
// ResourcePods are Pod resources
ResourcePods = "pods"
// ResourceNamespaces are Namespace resources
ResourceNamespaces = "namespaces"
)

// NewListWatch creates a ListWatch for the specified Resource
func NewListWatch(client *kubernetes.Clientset, resource string) *cache.ListWatch {
return cache.NewListWatchFromClient(client.Core().RESTClient(), resource, "", fields.Everything())
}
9 changes: 8 additions & 1 deletion pkg/k8s/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,15 @@ func PodFields(pod *v1.Pod) logrus.Fields {
"pod.status.ip": pod.Status.PodIP,
"pod.namespace": pod.ObjectMeta.Namespace,
"pod.name": pod.ObjectMeta.Name,
"pod.iam.role": pod.ObjectMeta.Annotations[IAMRoleKey],
"pod.iam.role": pod.ObjectMeta.Annotations[AnnotationIAMRoleKey],
"resource.version": pod.ObjectMeta.ResourceVersion,
"generation.metadata": pod.ObjectMeta.Generation,
}
}

func namespaceFields(n *v1.Namespace) logrus.Fields {
return logrus.Fields{
"namespace": n.Name,
"namespace.permitted": n.GetAnnotations()[AnnotationPermittedKey],
}
}
112 changes: 65 additions & 47 deletions pkg/k8s/namespace_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,77 +15,52 @@ package k8s

import (
"context"
"time"

log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
"time"
)

const (
// AnnotationName hold the name of the annotation for the regex expressing the
// AnnotationPermittedKey hold the name of the annotation for the regex expressing the
// roles that can be assumed by pods in that namespace.
AnnotationName = "iam.amazonaws.com/permitted"
AnnotationPermittedKey = "iam.amazonaws.com/permitted"
)

// NamespaceCache implements NamespaceFinder interface used to determine which roles
// can be assumed by pods
type NamespaceCache struct {
store cache.Store
indexer cache.Indexer
controller cache.Controller
}

func namespaceFields(n *v1.Namespace) log.Fields {
return log.Fields{
"namespace": n.Name,
"namespace.permitted": n.GetAnnotations()[AnnotationName],
}
}

func (c *NamespaceCache) process(obj interface{}) error {
d := obj.(cache.Deltas).Newest()

ns := d.Object.(*v1.Namespace)
fields := log.Fields{
"cache.object": "namespace",
"cache.delta.type": d.Type,
}
log.WithFields(fields).WithFields(namespaceFields(ns)).Debugf("processing delta")

switch d.Type {
case cache.Sync:
return c.store.Add(d.Object)
case cache.Added:
return c.store.Add(d.Object)
case cache.Updated:
return c.store.Update(d.Object)
case cache.Deleted:
return c.store.Delete(d.Object)
}
return nil
}

// NewNamespaceCache creates the cache storing Namespaces
func NewNamespaceCache(source cache.ListerWatcher, syncInterval time.Duration) *NamespaceCache {
c := &NamespaceCache{}
c.store = cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
config := &cache.Config{
Queue: cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, c.store),
ListerWatcher: source,
ObjectType: &v1.Namespace{},
FullResyncPeriod: syncInterval,
RetryOnError: false,
Process: c.process,
namespaceLogger := &namespaceLogger{}
indexer, controller := cache.NewIndexerInformer(source, &v1.Namespace{}, syncInterval, namespaceLogger, cache.Indexers{})
return &NamespaceCache{
indexer: indexer,
controller: controller,
}
c.controller = cache.New(config)
return c
}

func (c *NamespaceCache) Run(ctx context.Context) {
// Run starts the cache processing updates. Blocks until cache has synced
func (c *NamespaceCache) Run(ctx context.Context) error {
go c.controller.Run(ctx.Done())
log.Infof("started namespace cache controller")

ok := cache.WaitForCacheSync(ctx.Done(), c.controller.HasSynced)
if !ok {
return ErrWaitingForSync
}

return nil
}

// FindNamespace finds the Namespace by it's name
func (c *NamespaceCache) FindNamespace(ctx context.Context, name string) (*v1.Namespace, error) {
obj, exists, err := c.store.GetByKey(name)
obj, exists, err := c.indexer.GetByKey(name)
if err != nil {
return nil, err
}
Expand All @@ -94,3 +69,46 @@ func (c *NamespaceCache) FindNamespace(ctx context.Context, name string) (*v1.Na
}
return obj.(*v1.Namespace), nil
}

type namespaceLogger struct {
}

func (o *namespaceLogger) OnAdd(obj interface{}) {
namespace, isNamespace := obj.(*v1.Namespace)
if !isNamespace {
log.Errorf("OnAdd unexpected object: %+v", obj)
return
}
log.WithFields(namespaceFields(namespace)).Debugf("added namespace")
}

func (o *namespaceLogger) OnDelete(obj interface{}) {
namespace, isNamespace := obj.(*v1.Namespace)
if !isNamespace {
deletedObj, isDeleted := obj.(cache.DeletedFinalStateUnknown)
if !isDeleted {
log.Errorf("OnDelete unexpected object: %+v", obj)
return
}

namespace, isNamespace = deletedObj.Obj.(*v1.Namespace)
if !isNamespace {
log.Errorf("OnDelete unexpected DeletedFinalStateUnknown object: %+v", deletedObj.Obj)
}
log.WithFields(namespaceFields(namespace)).Debugf("deleted namespace")
return
}

log.WithFields(namespaceFields(namespace)).Debugf("deleted namespace")
return
}

func (o *namespaceLogger) OnUpdate(old, new interface{}) {
namespace, isNamespace := new.(*v1.Namespace)
if !isNamespace {
log.Errorf("OnUpdate unexpected object: %+v", new)
return
}

log.WithFields(namespaceFields(namespace)).Debugf("updated namespace")
}
Loading

0 comments on commit 3de2fcb

Please sign in to comment.