Skip to content

Commit

Permalink
NETOBSERV-1404: mitigate narrowcache weakness on missed events (#504)
Browse files Browse the repository at this point in the history
* NETOBSERV-1404 mitigate narrowcache weakness on missed events

narrowcache might contain outdated data in case of missed events, add a
mechanism to invalidate entries on errors

* Fix other potential issues:

- defer Unlocks
- in case where a watch stops, remove the cache entry, so a new "get"
  would recreate it
  • Loading branch information
jotak authored Dec 2, 2023
1 parent 654de83 commit 7ad130e
Showing 1 changed file with 62 additions and 8 deletions.
70 changes: 62 additions & 8 deletions pkg/narrowcache/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/workqueue"
Expand All @@ -23,9 +24,9 @@ import (
type Client struct {
client.Client
liveClient kubernetes.Interface
watchedGVKs map[string]GVKInfo
watchedObjects map[string]*watchedObject
wmut sync.RWMutex
watchedGVKs map[string]GVKInfo // read only once init
watchedObjects map[string]*watchedObject // mutex'ed
wmut sync.RWMutex // for watchedObjects map
}

type watchedObject struct {
Expand Down Expand Up @@ -137,6 +138,8 @@ func (c *Client) updateCache(ctx context.Context, key string, watcher watch.Inte
}
c.callHandlers(ctx, key, watchEvent)
}
rlog.WithValues("key", key).Info("Watch terminated. Clearing cache entry.")
c.clearEntryByKey(key)
}

func (c *Client) setToCache(key string, obj runtime.Object) error {
Expand All @@ -145,29 +148,29 @@ func (c *Client) setToCache(key string, obj runtime.Object) error {
return fmt.Errorf("could not convert runtime.Object to client.Object")
}
c.wmut.Lock()
defer c.wmut.Unlock()
if ca := c.watchedObjects[key]; ca != nil {
ca.cached = cObj
} else {
c.watchedObjects[key] = &watchedObject{cached: cObj}
}
c.wmut.Unlock()
return nil
}

func (c *Client) removeFromCache(key string) {
c.wmut.Lock()
defer c.wmut.Unlock()
if ca := c.watchedObjects[key]; ca != nil {
ca.cached = nil
}
c.wmut.Unlock()
}

func (c *Client) addHandler(key string, hoq handlerOnQueue) {
c.wmut.Lock()
defer c.wmut.Unlock()
if ca := c.watchedObjects[key]; ca != nil {
ca.handlers = append(ca.handlers, hoq)
}
c.wmut.Unlock()
}

func (c *Client) callHandlers(ctx context.Context, key string, ev watch.Event) {
Expand Down Expand Up @@ -197,12 +200,13 @@ func (c *Client) callHandlers(ctx context.Context, key string, ev watch.Event) {
return
}
c.wmut.RLock()
defer c.wmut.RUnlock()
if ca := c.watchedObjects[key]; ca != nil {
for _, hoq := range ca.handlers {
fn(hoq)
h := hoq
go fn(h)
}
}
c.wmut.RUnlock()
}

func (c *Client) GetSource(ctx context.Context, obj client.Object) (source.Source, error) {
Expand Down Expand Up @@ -230,3 +234,53 @@ func (c *Client) GetSource(ctx context.Context, obj client.Object) (source.Sourc
},
}, nil
}

func (c *Client) clearEntry(ctx context.Context, obj client.Object) {
key := types.NamespacedName{Name: obj.GetName(), Namespace: obj.GetNamespace()}
gvk, _ := c.GroupVersionKindFor(obj)
strGVK := gvk.String()
if _, managed := c.watchedGVKs[strGVK]; managed {
log.FromContext(ctx).
WithName("narrowcache").
WithValues("name", obj.GetName(), "namespace", obj.GetNamespace()).
Info("Invalidating cache entry")
strGVK := gvk.String()
objKey := strGVK + "|" + key.String()
c.clearEntryByKey(objKey)
}
}

func (c *Client) clearEntryByKey(key string) {
// Note that this doesn't remove the watch, which lives in a goroutine
// Watch would recreate cache object on received event, or it can be recreated on subsequent Get call
c.wmut.Lock()
defer c.wmut.Unlock()
delete(c.watchedObjects, key)
}

func (c *Client) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error {
if err := c.Client.Create(ctx, obj, opts...); err != nil {
// might be due to an outdated cache, clear the corresponding entry
c.clearEntry(ctx, obj)
return err
}
return nil
}

func (c *Client) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error {
if err := c.Client.Delete(ctx, obj, opts...); err != nil {
// might be due to an outdated cache, clear the corresponding entry
c.clearEntry(ctx, obj)
return err
}
return nil
}

func (c *Client) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error {
if err := c.Client.Update(ctx, obj, opts...); err != nil {
// might be due to an outdated cache, clear the corresponding entry
c.clearEntry(ctx, obj)
return err
}
return nil
}

0 comments on commit 7ad130e

Please sign in to comment.