Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor timecache implementations #523

Merged
merged 7 commits into from
Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ type PubSub struct {
inboundStreamsMx sync.Mutex
inboundStreams map[peer.ID]network.Stream

seenMessagesMx sync.Mutex
seenMessages timecache.TimeCache
seenMsgTTL time.Duration
seenMsgStrategy timecache.Strategy
Expand Down Expand Up @@ -567,6 +566,7 @@ func (p *PubSub) processLoop(ctx context.Context) {
}
p.peers = nil
p.topics = nil
p.seenMessages.Done()
}()

for {
Expand Down Expand Up @@ -985,22 +985,13 @@ func (p *PubSub) notifySubs(msg *Message) {

// seenMessage returns whether we already saw this message before
func (p *PubSub) seenMessage(id string) bool {
p.seenMessagesMx.Lock()
defer p.seenMessagesMx.Unlock()
return p.seenMessages.Has(id)
}

// markSeen marks a message as seen such that seenMessage returns `true' for the given id
// returns true if the message was freshly marked
func (p *PubSub) markSeen(id string) bool {
p.seenMessagesMx.Lock()
defer p.seenMessagesMx.Unlock()
if p.seenMessages.Has(id) {
return false
}

p.seenMessages.Add(id)
return true
return p.seenMessages.Add(id)
}

// subscribedToMessage returns whether we are subscribed to one of the topics
Expand Down
82 changes: 33 additions & 49 deletions timecache/first_seen_cache.go
Original file line number Diff line number Diff line change
@@ -1,72 +1,56 @@
package timecache

import (
"container/list"
"context"
"sync"
"time"
)

// FirstSeenCache is a thread-safe copy of https://github.com/whyrusleeping/timecache.
// FirstSeenCache is a time cache that only marks the expiry of a message when first added.
type FirstSeenCache struct {
q *list.List
m map[string]time.Time
span time.Duration
guard *sync.RWMutex
}
lk sync.RWMutex
m map[string]time.Time
ttl time.Duration

func newFirstSeenCache(span time.Duration) TimeCache {
return &FirstSeenCache{
q: list.New(),
m: make(map[string]time.Time),
span: span,
guard: new(sync.RWMutex),
}
done func()
}

func (tc FirstSeenCache) Add(s string) {
tc.guard.Lock()
defer tc.guard.Unlock()
var _ TimeCache = (*FirstSeenCache)(nil)

_, ok := tc.m[s]
if ok {
log.Debug("first-seen: got same entry")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we removed the log here, we can also remove the global log

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess... doesnt hurt all that much though.

return
func newFirstSeenCache(ttl time.Duration) *FirstSeenCache {
tc := &FirstSeenCache{
m: make(map[string]time.Time),
ttl: ttl,
}

// TODO(#515): Do GC in the background
tc.sweep()
ctx, done := context.WithCancel(context.Background())
tc.done = done
go background(ctx, &tc.lk, tc.m)

tc.m[s] = time.Now()
tc.q.PushFront(s)
return tc
}

func (tc FirstSeenCache) sweep() {
for {
back := tc.q.Back()
if back == nil {
return
}
func (tc *FirstSeenCache) Done() {
tc.done()
}

v := back.Value.(string)
t, ok := tc.m[v]
if !ok {
panic("inconsistent cache state")
}
func (tc *FirstSeenCache) Has(s string) bool {
tc.lk.RLock()
defer tc.lk.RUnlock()

if time.Since(t) > tc.span {
tc.q.Remove(back)
delete(tc.m, v)
} else {
return
}
}
_, ok := tc.m[s]
return ok
}

func (tc FirstSeenCache) Has(s string) bool {
tc.guard.RLock()
defer tc.guard.RUnlock()
func (tc *FirstSeenCache) Add(s string) bool {
tc.lk.Lock()
defer tc.lk.Unlock()

_, ok := tc.m[s]
if ok {
return false
}

ts, ok := tc.m[s]
// Only consider the entry found if it was present in the cache AND hadn't already expired.
return ok && time.Since(ts) <= tc.span
tc.m[s] = time.Now().Add(tc.ttl)
return true
}
9 changes: 7 additions & 2 deletions timecache/first_seen_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,27 @@ func TestFirstSeenCacheFound(t *testing.T) {
}

func TestFirstSeenCacheExpire(t *testing.T) {
backgroundSweepInterval = time.Second

tc := newFirstSeenCache(time.Second)
for i := 0; i < 11; i++ {
for i := 0; i < 10; i++ {
tc.Add(fmt.Sprint(i))
time.Sleep(time.Millisecond * 100)
}

time.Sleep(2 * time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice if we could use a mock clock here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah... future pr ;)

if tc.Has(fmt.Sprint(0)) {
t.Fatal("should have dropped this from the cache already")
}
}

func TestFirstSeenCacheNotFoundAfterExpire(t *testing.T) {
backgroundSweepInterval = time.Second

tc := newFirstSeenCache(time.Second)
tc.Add(fmt.Sprint(0))
time.Sleep(1100 * time.Millisecond)

time.Sleep(2 * time.Second)
if tc.Has(fmt.Sprint(0)) {
t.Fatal("should have dropped this from the cache already")
}
Expand Down
94 changes: 34 additions & 60 deletions timecache/last_seen_cache.go
Original file line number Diff line number Diff line change
@@ -1,84 +1,58 @@
package timecache

import (
"context"
"sync"
"time"

"github.com/emirpasic/gods/maps/linkedhashmap"
)

// LastSeenCache is a LRU cache that keeps entries for up to a specified time duration. After this duration has elapsed,
// "old" entries will be purged from the cache.
//
// It's also a "sliding window" cache. Every time an unexpired entry is seen again, its timestamp slides forward. This
// keeps frequently occurring entries cached and prevents them from being propagated, especially because of network
// issues that might increase the number of duplicate messages in the network.
//
// Garbage collection of expired entries is event-driven, i.e. it only happens when there is a new entry added to the
// cache. This should be ok - if existing entries are being looked up then the cache is not growing, and when a new one
// appears that would grow the cache, garbage collection will attempt to reduce the pressure on the cache.
//
// This implementation is heavily inspired by https://github.com/whyrusleeping/timecache.
// LastSeenCache is a time cache that extends the expiry of a seen message when added
// or checked for presence with Has..
type LastSeenCache struct {
m *linkedhashmap.Map
span time.Duration
guard *sync.Mutex
lk sync.Mutex
m map[string]time.Time
ttl time.Duration

done func()
}

func newLastSeenCache(span time.Duration) TimeCache {
return &LastSeenCache{
m: linkedhashmap.New(),
span: span,
guard: new(sync.Mutex),
var _ TimeCache = (*LastSeenCache)(nil)

func newLastSeenCache(ttl time.Duration) *LastSeenCache {
tc := &LastSeenCache{
m: make(map[string]time.Time),
ttl: ttl,
}
}

func (tc *LastSeenCache) Add(s string) {
tc.guard.Lock()
defer tc.guard.Unlock()
ctx, done := context.WithCancel(context.Background())
tc.done = done
go background(ctx, &tc.lk, tc.m)

tc.add(s)
return tc
}

// Garbage collect expired entries
// TODO(#515): Do GC in the background
tc.gc()
func (tc *LastSeenCache) Done() {
tc.done()
}

func (tc *LastSeenCache) add(s string) {
// We don't need a lock here because this function is always called with the lock already acquired.
func (tc *LastSeenCache) Add(s string) bool {
tc.lk.Lock()
defer tc.lk.Unlock()

// If an entry already exists, remove it and add a new one to the back of the list to maintain temporal ordering and
// an accurate sliding window.
tc.m.Remove(s)
now := time.Now()
tc.m.Put(s, &now)
}
_, ok := tc.m[s]
tc.m[s] = time.Now().Add(tc.ttl)

func (tc *LastSeenCache) gc() {
// We don't need a lock here because this function is always called with the lock already acquired.
iter := tc.m.Iterator()
for iter.Next() {
key := iter.Key()
ts := iter.Value().(*time.Time)
// Exit if we've found an entry with an unexpired timestamp. Since we're iterating in order of insertion, all
// entries hereafter will be unexpired.
if time.Since(*ts) <= tc.span {
return
}
tc.m.Remove(key)
}
return !ok
}

func (tc *LastSeenCache) Has(s string) bool {
tc.guard.Lock()
defer tc.guard.Unlock()
tc.lk.Lock()
defer tc.lk.Unlock()

// If the entry exists and has not already expired, slide it forward.
if ts, found := tc.m.Get(s); found {
if t := ts.(*time.Time); time.Since(*t) <= tc.span {
tc.add(s)
return true
}
_, ok := tc.m[s]
if ok {
tc.m[s] = time.Now().Add(tc.ttl)
}
return false

return ok
}
8 changes: 7 additions & 1 deletion timecache/last_seen_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@ func TestLastSeenCacheFound(t *testing.T) {
}

func TestLastSeenCacheExpire(t *testing.T) {
backgroundSweepInterval = time.Second
tc := newLastSeenCache(time.Second)
for i := 0; i < 11; i++ {
tc.Add(fmt.Sprint(i))
time.Sleep(time.Millisecond * 100)
}

time.Sleep(2 * time.Second)
if tc.Has(fmt.Sprint(0)) {
t.Fatal("should have dropped this from the cache already")
}
}

func TestLastSeenCacheSlideForward(t *testing.T) {
t.Skip("timing is too fine grained to run in CI")

tc := newLastSeenCache(time.Second)
i := 0

Expand Down Expand Up @@ -74,10 +78,12 @@ func TestLastSeenCacheSlideForward(t *testing.T) {
}

func TestLastSeenCacheNotFoundAfterExpire(t *testing.T) {
backgroundSweepInterval = time.Second

tc := newLastSeenCache(time.Second)
tc.Add(fmt.Sprint(0))
time.Sleep(1100 * time.Millisecond)

time.Sleep(2 * time.Second)
if tc.Has(fmt.Sprint(0)) {
t.Fatal("should have dropped this from the cache already")
}
Expand Down
15 changes: 8 additions & 7 deletions timecache/time_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,24 @@ const (
)

type TimeCache interface {
Add(string)
Add(string) bool
vyzo marked this conversation as resolved.
Show resolved Hide resolved
Has(string) bool
Done()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is .Close() not the preferred convention? .Done() is used in context.Context which returns a channel that's closed when the context is finished. .Done() is also used in waitgroups to signal completion (but not cleanup).

.Close() seems closer to the intent.

That said. This is minor, really just a nit

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close returns error... there is none here, and there is no point.

Copy link
Collaborator Author

@vyzo vyzo Feb 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially i used Closer actually, but then i realize i am making work for myself with the error ;)

}

// NewTimeCache defaults to the original ("first seen") cache implementation
func NewTimeCache(span time.Duration) TimeCache {
return NewTimeCacheWithStrategy(Strategy_FirstSeen, span)
func NewTimeCache(ttl time.Duration) TimeCache {
return NewTimeCacheWithStrategy(Strategy_FirstSeen, ttl)
}

func NewTimeCacheWithStrategy(strategy Strategy, span time.Duration) TimeCache {
func NewTimeCacheWithStrategy(strategy Strategy, ttl time.Duration) TimeCache {
switch strategy {
case Strategy_FirstSeen:
return newFirstSeenCache(span)
return newFirstSeenCache(ttl)
case Strategy_LastSeen:
return newLastSeenCache(span)
return newLastSeenCache(ttl)
default:
// Default to the original time cache implementation
return newFirstSeenCache(span)
return newFirstSeenCache(ttl)
}
}
35 changes: 35 additions & 0 deletions timecache/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package timecache

import (
"context"
"sync"
"time"
)

var backgroundSweepInterval = time.Minute

func background(ctx context.Context, lk sync.Locker, m map[string]time.Time) {
ticker := time.NewTimer(backgroundSweepInterval)
defer ticker.Stop()

for {
select {
case now := <-ticker.C:
sweep(lk, m, now)

case <-ctx.Done():
return
}
}
}

func sweep(lk sync.Locker, m map[string]time.Time, now time.Time) {
lk.Lock()
defer lk.Unlock()

for k, expiry := range m {
if expiry.Before(now) {
delete(m, k)
}
}
}