Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sublist Shared Cache Improvements #726

Merged
merged 5 commits into from
Aug 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 105 additions & 45 deletions server/sublist.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ var (
const (
// cacheMax is used to bound limit the frontend cache
slCacheMax = 1024
// If we run a sweeper we will drain to this count.
slCacheSweep = 512
// plistMin is our lower bounds to create a fast plist for Match.
plistMin = 256
)
Expand All @@ -60,8 +62,10 @@ type Sublist struct {
cacheHits uint64
inserts uint64
removes uint64
cache map[string]*SublistResult
root *level
cache sync.Map
cacheNum int32
ccSweep int32
count uint32
}

Expand Down Expand Up @@ -92,7 +96,7 @@ func newLevel() *level {

// New will create a default sublist
func NewSublist() *Sublist {
return &Sublist{root: newLevel(), cache: make(map[string]*SublistResult)}
return &Sublist{root: newLevel()}
}

// Insert adds a subscription into the sublist
Expand Down Expand Up @@ -202,50 +206,76 @@ func copyResult(r *SublistResult) *SublistResult {
return nr
}

// Adds a new sub to an existing result.
func (r *SublistResult) addSubToResult(sub *subscription) *SublistResult {
// Copy since others may have a reference.
nr := copyResult(r)
if sub.queue == nil {
nr.psubs = append(nr.psubs, sub)
} else {
if i := findQSliceForSub(sub, nr.qsubs); i >= 0 {
nr.qsubs[i] = append(nr.qsubs[i], sub)
} else {
nr.qsubs = append(nr.qsubs, []*subscription{sub})
}
}
return nr
}

// addToCache will add the new entry to existing cache
// entries if needed. Assumes write lock is held.
func (s *Sublist) addToCache(subject string, sub *subscription) {
for k, r := range s.cache {
if matchLiteral(k, subject) {
// Copy since others may have a reference.
nr := copyResult(r)
if sub.queue == nil {
nr.psubs = append(nr.psubs, sub)
} else {
if i := findQSliceForSub(sub, nr.qsubs); i >= 0 {
nr.qsubs[i] = append(nr.qsubs[i], sub)
} else {
nr.qsubs = append(nr.qsubs, []*subscription{sub})
}
}
s.cache[k] = nr
// If literal we can direct match.
if subjectIsLiteral(subject) {
if v, ok := s.cache.Load(subject); ok {
r := v.(*SublistResult)
s.cache.Store(subject, r.addSubToResult(sub))
}
return
}
s.cache.Range(func(k, v interface{}) bool {
key := k.(string)
r := v.(*SublistResult)
if matchLiteral(key, subject) {
s.cache.Store(key, r.addSubToResult(sub))
}
return true
})
}

// removeFromCache will remove the sub from any active cache entries.
// Assumes write lock is held.
func (s *Sublist) removeFromCache(subject string, sub *subscription) {
for k := range s.cache {
if !matchLiteral(k, subject) {
continue
// If literal we can direct match.
if subjectIsLiteral(subject) {
// Load for accounting
if _, ok := s.cache.Load(subject); ok {
s.cache.Delete(subject)
atomic.AddInt32(&s.cacheNum, -1)
}
// Since someone else may be referecing, can't modify the list
// safely, just let it re-populate.
delete(s.cache, k)
return
}
s.cache.Range(func(k, v interface{}) bool {
key := k.(string)
if matchLiteral(key, subject) {
// Since someone else may be referecing, can't modify the list
// safely, just let it re-populate.
s.cache.Delete(key)
atomic.AddInt32(&s.cacheNum, -1)
}
return true
})
}

// Match will match all entries to the literal subject.
// It will return a set of results for both normal and queue subscribers.
func (s *Sublist) Match(subject string) *SublistResult {
s.RLock()
atomic.AddUint64(&s.matches, 1)
rc, ok := s.cache[subject]
s.RUnlock()
if ok {

// Check cache first.
if r, ok := s.cache.Load(subject); ok {
atomic.AddUint64(&s.cacheHits, 1)
return rc
return r.(*SublistResult)
}

tsa := [32]string{}
Expand All @@ -262,23 +292,36 @@ func (s *Sublist) Match(subject string) *SublistResult {
// FIXME(dlc) - Make shared pool between sublist and client readLoop?
result := &SublistResult{}

s.Lock()
s.RLock()
matchLevel(s.root, tokens, result)
s.RUnlock()

// Add to our cache
s.cache[subject] = result
// Bound the number of entries to sublistMaxCache
if len(s.cache) > slCacheMax {
for k := range s.cache {
delete(s.cache, k)
break
}
s.cache.Store(subject, result)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this introduces a race. For example, if a client subscribes between the unlock and the call to s.cache.Store it will be shadowed and won't be delivered messages for this subscription until the cache entry gets cleared.
Also for queue subscriptions a client could have removed a subscription (but still be connected) and be delivered messages while no longer processing it and would "steal" it from other valid subscriptions of the same queue.

The read lock should be held while calling s.cache.Store to ensure that the sublist won't be modified concurrently.
There may be similar issues with Load, although I don't see one at the moment holding the read lock is probably a good idea too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly why I disliked that Go added sync.Map. It "introduces" these kind of issues.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same class of problems may happen with atomic operations but they are still useful. Concurrency is hard to reason about...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me take a closer look. Let me know if your production issue resolves with this fix though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok #729 should handle. Thanks for pointing this out, you were correct.

n := atomic.AddInt32(&s.cacheNum, 1)

if n > slCacheMax && atomic.CompareAndSwapInt32(&s.ccSweep, 0, 1) {
go s.reduceCacheCount()
}
s.Unlock()

return result
}

// Remove entries in the cache until we are under the maximum.
// TODO(dlc) this could be smarter now that its not inline.
func (s *Sublist) reduceCacheCount() {
defer atomic.StoreInt32(&s.ccSweep, 0)
// If we are over the cache limit randomly drop until under the limit.
s.cache.Range(func(k, v interface{}) bool {
s.cache.Delete(k.(string))
n := atomic.AddInt32(&s.cacheNum, -1)
if n < slCacheSweep {
return false
}
return true
})
}

// This will add in a node's results to the total results.
func addNodeToResults(n *node, results *SublistResult) {
// Normal subscriptions
Expand Down Expand Up @@ -501,7 +544,7 @@ func (s *Sublist) removeFromNode(n *node, sub *subscription) (found bool) {
if found && n.plist != nil {
// This will brute force remove the plist to perform
// correct behavior. Will get repopulated on a call
//to Match as needed.
// to Match as needed.
n.plist = nil
}
return found
Expand All @@ -527,9 +570,7 @@ func (s *Sublist) Count() uint32 {

// CacheCount returns the number of result sets in the cache.
func (s *Sublist) CacheCount() int {
s.RLock()
defer s.RUnlock()
return len(s.cache)
return int(atomic.LoadInt32(&s.cacheNum))
}

// Public stats for the sublist
Expand All @@ -551,25 +592,30 @@ func (s *Sublist) Stats() *SublistStats {

st := &SublistStats{}
st.NumSubs = s.count
st.NumCache = uint32(len(s.cache))
st.NumCache = uint32(atomic.LoadInt32(&s.cacheNum))
st.NumInserts = s.inserts
st.NumRemoves = s.removes
st.NumMatches = atomic.LoadUint64(&s.matches)
if st.NumMatches > 0 {
st.CacheHitRate = float64(atomic.LoadUint64(&s.cacheHits)) / float64(st.NumMatches)
}
// whip through cache for fanout stats

// whip through cache for fanout stats, this can be off if cache is full and doing evictions.
tot, max := 0, 0
for _, r := range s.cache {
clen := 0
s.cache.Range(func(k, v interface{}) bool {
clen += 1
r := v.(*SublistResult)
l := len(r.psubs) + len(r.qsubs)
tot += l
if l > max {
max = l
}
}
return true
})
st.MaxFanout = uint32(max)
if tot > 0 {
st.AvgFanout = float64(tot) / float64(len(s.cache))
st.AvgFanout = float64(tot) / float64(clen)
}
return st
}
Expand Down Expand Up @@ -614,6 +660,20 @@ func visitLevel(l *level, depth int) int {
return maxDepth
}

// Determine if the subject has any wildcards. Fast version, does not check for
// valid subject. Used in caching layer.
func subjectIsLiteral(subject string) bool {
for i, c := range subject {
if c == pwc || c == fwc {
if (i == 0 || subject[i-1] == btsep) &&
(i+1 == len(subject) || subject[i+1] == btsep) {
return false
}
}
}
return true
}

// IsValidSubject returns true if a subject is valid, false otherwise
func IsValidSubject(subject string) bool {
if subject == "" {
Expand Down
Loading