Skip to content

Commit

Permalink
Merge pull request kubernetes#12721 from wojtek-t/fix_cacher_filtering
Browse files Browse the repository at this point in the history
Store previous value in WatchCache for filtering
  • Loading branch information
fgrzadkowski committed Aug 18, 2015
2 parents 19f5266 + 3a71eb1 commit 8e2cad7
Show file tree
Hide file tree
Showing 4 changed files with 263 additions and 22 deletions.
47 changes: 37 additions & 10 deletions pkg/client/cache/watch_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,27 @@ import (
"k8s.io/kubernetes/pkg/watch"
)

// TODO(wojtek-t): All structure in this file should be private to
// pkg/storage package. We should remove the reference to WatchCache
// from Reflector (by changing the Replace method signature in Store
// interface to take resource version too) and move it under pkg/storage.

// WatchCacheEvent is a single "watch event" that is send to users of
// WatchCache. Additionally to a typical "watch.Event" it contains
// the previous value of the object to enable proper filtering in the
// upper layers.
type WatchCacheEvent struct {
Type watch.EventType
Object runtime.Object
PrevObject runtime.Object
}

// watchCacheElement is a single "watch event" stored in a cache.
// It contains the resource version of the object and the object
// itself.
type watchCacheElement struct {
resourceVersion uint64
event watch.Event
watchCacheEvent WatchCacheEvent
}

// WatchCache implements a Store interface.
Expand Down Expand Up @@ -66,8 +81,9 @@ type WatchCache struct {
// This handler is run at the end of every successful Replace() method.
onReplace func()

// This handler is run at the end of every Add/Update/Delete method.
onEvent func(watch.Event)
// This handler is run at the end of every Add/Update/Delete method
// and additionally gets the previous value of the object.
onEvent func(WatchCacheEvent)
}

func NewWatchCache(capacity int) *WatchCache {
Expand Down Expand Up @@ -140,16 +156,27 @@ func parseResourceVersion(resourceVersion string) (uint64, error) {
func (w *WatchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(runtime.Object) error) error {
w.Lock()
defer w.Unlock()
previous, exists, err := w.store.Get(event.Object)
if err != nil {
return err
}
var prevObject runtime.Object
if exists {
prevObject = previous.(runtime.Object)
} else {
prevObject = nil
}
watchCacheEvent := WatchCacheEvent{event.Type, event.Object, prevObject}
if w.onEvent != nil {
w.onEvent(event)
w.onEvent(watchCacheEvent)
}
w.updateCache(resourceVersion, event)
w.updateCache(resourceVersion, watchCacheEvent)
w.resourceVersion = resourceVersion
return updateFunc(event.Object)
}

// Assumes that lock is already held for write.
func (w *WatchCache) updateCache(resourceVersion uint64, event watch.Event) {
func (w *WatchCache) updateCache(resourceVersion uint64, event WatchCacheEvent) {
if w.endIndex == w.startIndex+w.capacity {
// Cache is full - remove the oldest element.
w.startIndex++
Expand Down Expand Up @@ -219,13 +246,13 @@ func (w *WatchCache) SetOnReplace(onReplace func()) {
w.onReplace = onReplace
}

func (w *WatchCache) SetOnEvent(onEvent func(watch.Event)) {
func (w *WatchCache) SetOnEvent(onEvent func(WatchCacheEvent)) {
w.Lock()
defer w.Unlock()
w.onEvent = onEvent
}

func (w *WatchCache) GetAllEventsSince(resourceVersion uint64) ([]watch.Event, error) {
func (w *WatchCache) GetAllEventsSince(resourceVersion uint64) ([]WatchCacheEvent, error) {
w.RLock()
defer w.RUnlock()

Expand All @@ -244,9 +271,9 @@ func (w *WatchCache) GetAllEventsSince(resourceVersion uint64) ([]watch.Event, e
return w.cache[(w.startIndex+i)%w.capacity].resourceVersion >= resourceVersion
}
first := sort.Search(size, f)
result := make([]watch.Event, size-first)
result := make([]WatchCacheEvent, size-first)
for i := 0; i < size-first; i++ {
result[i] = w.cache[(w.startIndex+first+i)%w.capacity].event
result[i] = w.cache[(w.startIndex+first+i)%w.capacity].watchCacheEvent
}
return result, nil
}
60 changes: 60 additions & 0 deletions pkg/client/cache/watch_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/watch"
)

func makeTestPod(name string, resourceVersion uint64) *api.Pod {
Expand Down Expand Up @@ -108,6 +109,34 @@ func TestEvents(t *testing.T) {
store := NewWatchCache(5)

store.Add(makeTestPod("pod", 2))

// Test for Added event.
{
_, err := store.GetAllEventsSince(1)
if err == nil {
t.Errorf("expected error too old")
}
}
{
result, err := store.GetAllEventsSince(2)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(result) != 1 {
t.Fatalf("unexpected events: %v", result)
}
if result[0].Type != watch.Added {
t.Errorf("unexpected event type: %v", result[0].Type)
}
pod := makeTestPod("pod", uint64(2))
if !api.Semantic.DeepEqual(pod, result[0].Object) {
t.Errorf("unexpected item: %v, expected: %v", result[0].Object, pod)
}
if result[0].PrevObject != nil {
t.Errorf("unexpected item: %v", result[0].PrevObject)
}
}

store.Update(makeTestPod("pod", 3))
store.Update(makeTestPod("pod", 4))

Expand All @@ -127,10 +156,17 @@ func TestEvents(t *testing.T) {
t.Fatalf("unexpected events: %v", result)
}
for i := 0; i < 2; i++ {
if result[i].Type != watch.Modified {
t.Errorf("unexpected event type: %v", result[i].Type)
}
pod := makeTestPod("pod", uint64(i+3))
if !api.Semantic.DeepEqual(pod, result[i].Object) {
t.Errorf("unexpected item: %v, expected: %v", result[i].Object, pod)
}
prevPod := makeTestPod("pod", uint64(i+2))
if !api.Semantic.DeepEqual(prevPod, result[i].PrevObject) {
t.Errorf("unexpected item: %v, expected: %v", result[i].PrevObject, prevPod)
}
}
}

Expand Down Expand Up @@ -160,4 +196,28 @@ func TestEvents(t *testing.T) {
}
}
}

// Test for delete event.
store.Delete(makeTestPod("pod", uint64(9)))

{
result, err := store.GetAllEventsSince(9)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(result) != 1 {
t.Fatalf("unexpected events: %v", result)
}
if result[0].Type != watch.Deleted {
t.Errorf("unexpected event type: %v", result[0].Type)
}
pod := makeTestPod("pod", uint64(9))
if !api.Semantic.DeepEqual(pod, result[0].Object) {
t.Errorf("unexpected item: %v, expected: %v", result[0].Object, pod)
}
prevPod := makeTestPod("pod", uint64(8))
if !api.Semantic.DeepEqual(prevPod, result[0].PrevObject) {
t.Errorf("unexpected item: %v, expected: %v", result[0].PrevObject, prevPod)
}
}
}
36 changes: 24 additions & 12 deletions pkg/storage/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (c *Cacher) List(key string, listObj runtime.Object) error {
return nil
}

func (c *Cacher) processEvent(event watch.Event) {
func (c *Cacher) processEvent(event cache.WatchCacheEvent) {
c.Lock()
defer c.Unlock()
for _, watcher := range c.watchers {
Expand Down Expand Up @@ -271,16 +271,16 @@ func (lw *cacherListerWatcher) Watch(resourceVersion string) (watch.Interface, e
// cacherWatch implements watch.Interface
type cacheWatcher struct {
sync.Mutex
input chan watch.Event
input chan cache.WatchCacheEvent
result chan watch.Event
filter FilterFunc
stopped bool
forget func()
}

func newCacheWatcher(initEvents []watch.Event, filter FilterFunc, forget func()) *cacheWatcher {
func newCacheWatcher(initEvents []cache.WatchCacheEvent, filter FilterFunc, forget func()) *cacheWatcher {
watcher := &cacheWatcher{
input: make(chan watch.Event, 10),
input: make(chan cache.WatchCacheEvent, 10),
result: make(chan watch.Event, 10),
filter: filter,
stopped: false,
Expand Down Expand Up @@ -310,15 +310,29 @@ func (c *cacheWatcher) stop() {
}
}

func (c *cacheWatcher) add(event watch.Event) {
func (c *cacheWatcher) add(event cache.WatchCacheEvent) {
c.input <- event
}

func (c *cacheWatcher) process(initEvents []watch.Event) {
func (c *cacheWatcher) sendWatchCacheEvent(event cache.WatchCacheEvent) {
curObjPasses := event.Type != watch.Deleted && c.filter(event.Object)
oldObjPasses := false
if event.PrevObject != nil {
oldObjPasses = c.filter(event.PrevObject)
}
switch {
case curObjPasses && !oldObjPasses:
c.result <- watch.Event{watch.Added, event.Object}
case curObjPasses && oldObjPasses:
c.result <- watch.Event{watch.Modified, event.Object}
case !curObjPasses && oldObjPasses:
c.result <- watch.Event{watch.Deleted, event.Object}
}
}

func (c *cacheWatcher) process(initEvents []cache.WatchCacheEvent) {
for _, event := range initEvents {
if c.filter(event.Object) {
c.result <- event
}
c.sendWatchCacheEvent(event)
}
defer close(c.result)
defer c.Stop()
Expand All @@ -327,8 +341,6 @@ func (c *cacheWatcher) process(initEvents []watch.Event) {
if !ok {
return
}
if c.filter(event.Object) {
c.result <- event
}
c.sendWatchCacheEvent(event)
}
}
Loading

0 comments on commit 8e2cad7

Please sign in to comment.