diff --git a/common/cache/cache.go b/common/cache/cache.go index 74c08e53384..e80ac365a87 100644 --- a/common/cache/cache.go +++ b/common/cache/cache.go @@ -79,9 +79,10 @@ type Iterator interface { // Close closes the iterator // and releases any allocated resources Close() - // Entries returns a channel of MapEntry - // objects that can be used in a range loop - Entries() <-chan Entry + // HasNext return true if there is more items to be returned + HasNext() bool + // Next return the next item + Next() Entry } // Entry represents a key-value entry within the map diff --git a/common/cache/lru.go b/common/cache/lru.go index d40e7603699..9bbbfafcb5f 100644 --- a/common/cache/lru.go +++ b/common/cache/lru.go @@ -45,8 +45,8 @@ type ( } iteratorImpl struct { - stopCh chan struct{} - dataCh chan Entry + lru *lru + nextItem *list.Element } entryImpl struct { @@ -59,12 +59,41 @@ type ( // Close closes the iterator func (it *iteratorImpl) Close() { - close(it.stopCh) + it.lru.mut.Unlock() } -// Entries returns a channel of map entries -func (it *iteratorImpl) Entries() <-chan Entry { - return it.dataCh +// HasNext return true if there is more items to be returned +func (it *iteratorImpl) HasNext() bool { + for it.nextItem != nil { + entry := it.nextItem.Value.(*entryImpl) + if !it.lru.isEntryExpired(entry) { + // Entry is valid + return true + } + + nextItem := it.nextItem.Next() + it.lru.deleteInternal(it.nextItem) + it.nextItem = nextItem + } + + return false +} + +// Next return the next item +func (it *iteratorImpl) Next() Entry { + if !it.HasNext() { + panic("LRU cache iterator Next called when there is no next item") + } + + entry := it.nextItem.Value.(*entryImpl) + it.nextItem = it.nextItem.Next() + // make a copy of the entry so there will be no concurrent access to this entry + entry = &entryImpl{ + key: entry.key, + value: entry.value, + timestamp: entry.timestamp, + } + return entry } // Iterator returns an iterator to the map. This map @@ -73,37 +102,11 @@ func (it *iteratorImpl) Entries() <-chan Entry { func (c *lru) Iterator() Iterator { iterator := &iteratorImpl{ - dataCh: make(chan Entry, 8), - stopCh: make(chan struct{}), + lru: c, + nextItem: c.byAccess.Front(), } - go func(iterator *iteratorImpl) { - c.mut.Lock() - for _, element := range c.byKey { - entry := element.Value.(*entryImpl) - if c.isEntryExpired(entry) { - // Entry has expired - c.deleteInternal(element) - continue - } - // make a copy of the entry so there will be no concurrent access to this entry - entry = &entryImpl{ - key: entry.key, - value: entry.value, - timestamp: entry.timestamp, - } - select { - case iterator.dataCh <- entry: - case <-iterator.stopCh: - c.mut.Unlock() - close(iterator.dataCh) - return - } - } - c.mut.Unlock() - close(iterator.dataCh) - }(iterator) - + c.mut.Lock() return iterator } diff --git a/common/cache/lru_test.go b/common/cache/lru_test.go index b7e898120d9..302b0cb51c1 100644 --- a/common/cache/lru_test.go +++ b/common/cache/lru_test.go @@ -139,10 +139,11 @@ func TestLRUCacheConcurrentAccess(t *testing.T) { for j := 0; j < 50; j++ { result := []Entry{} it := cache.Iterator() - defer it.Close() - for entry := range it.Entries() { + for it.HasNext() { + entry := it.Next() result = append(result, entry) } + it.Close() } }() } @@ -215,9 +216,10 @@ func TestIterator(t *testing.T) { actual := map[string]string{} - ite := cache.Iterator() - defer ite.Close() - for entry := range ite.Entries() { + it := cache.Iterator() + defer it.Close() + for it.HasNext() { + entry := it.Next() actual[entry.Key().(string)] = entry.Value().(string) } diff --git a/common/convert.go b/common/convert.go index 310d0637ba4..8d8821488da 100644 --- a/common/convert.go +++ b/common/convert.go @@ -61,9 +61,9 @@ func BoolPtr(v bool) *bool { return &v } -// TimePtr makes a copy and returns the pointer to a string for time.Time, as ISO 8601 format. -func TimePtr(v time.Time) *int64 { - time := v.UTC().UnixNano() +// TimeInt64Ptr makes a copy and returns the pointer to a string for time.Time, as ISO 8601 format. +func TimeInt64Ptr(v time.Time) *int64 { + time := v.UnixNano() return &time } diff --git a/service/matching/matchingEngine.go b/service/matching/matchingEngine.go index 223732e5cd0..fe71caad83e 100644 --- a/service/matching/matchingEngine.go +++ b/service/matching/matchingEngine.go @@ -471,7 +471,7 @@ func (e *matchingEngineImpl) DescribeTaskList(ctx context.Context, request *m.De for _, poller := range tlMgr.GetAllPollerInfo() { pollers = append(pollers, &workflow.PollerInfo{ Identity: common.StringPtr(poller.identity), - Timestamp: common.TimePtr(poller.timestamp), + Timestamp: common.TimeInt64Ptr(poller.timestamp), }) } return &workflow.DescribeTaskListResponse{Pollers: pollers}, nil diff --git a/service/matching/pollerHistory.go b/service/matching/pollerHistory.go index 910558087e2..a737413f0cb 100644 --- a/service/matching/pollerHistory.go +++ b/service/matching/pollerHistory.go @@ -52,10 +52,11 @@ type pollerHistory struct { } func newPollerHistory() *pollerHistory { - opts := &cache.Options{} - opts.InitialCapacity = pollerHistoryInitSize - opts.TTL = pollerHistoryTTL - opts.Pin = false + opts := &cache.Options{ + InitialCapacity: pollerHistoryInitSize, + TTL: pollerHistoryTTL, + Pin: false, + } return &pollerHistory{ history: cache.New(pollerHistoryInitMaxSize, opts), @@ -71,7 +72,8 @@ func (pollers *pollerHistory) getAllPollerInfo() []*pollerInfo { ite := pollers.history.Iterator() defer ite.Close() - for entry := range ite.Entries() { + for ite.HasNext() { + entry := ite.Next() key := entry.Key().(pollerIdentity) timestamp := entry.Timestamp() result = append(result, &pollerInfo{