From 668229fc1d55362afc833ea7bbfc7edabc4c0316 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Fri, 9 Mar 2018 19:05:33 -0700 Subject: [PATCH 1/3] [FIXED] RACE between sublist remove and go through match results This would manifest for instance when server tries to send messages to queue subscribers and a subscription is unsubsribed at the same time. Resolves #640 --- server/sublist.go | 9 +++-- server/sublist_test.go | 78 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+), 2 deletions(-) diff --git a/server/sublist.go b/server/sublist.go index 4df3927e49e..62bfdb8b624 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -236,8 +236,13 @@ func (s *Sublist) Match(subject string) *SublistResult { s.Lock() matchLevel(s.root, tokens, result) + // Store in cache and return to caller a copy of the results to avoid + // race when sub is removed from sublist and caller walks through the + // results. + cr := copyResult(result) + // Add to our cache - s.cache[subject] = result + s.cache[subject] = cr // Bound the number of entries to sublistMaxCache if len(s.cache) > slCacheMax { for k := range s.cache { @@ -247,7 +252,7 @@ func (s *Sublist) Match(subject string) *SublistResult { } s.Unlock() - return result + return cr } // This will add in a node's results to the total results. diff --git a/server/sublist_test.go b/server/sublist_test.go index a7df7b15829..afad7a7d524 100644 --- a/server/sublist_test.go +++ b/server/sublist_test.go @@ -1,3 +1,6 @@ +// Copyright 2012-2017 Apcera Inc. All rights reserved. +// Copyright 2018 Synadia Communications Inc. All rights reserved. + package server import ( @@ -509,6 +512,81 @@ func TestSublistRemoveWithWildcardsAsLiterals(t *testing.T) { } } +func TestSubListRaceOnRemove(t *testing.T) { + s := NewSublist() + + var ( + total = 100 + subs = make(map[int]*subscription, total) // use map for randomness + ) + for i := 0; i < total; i++ { + sub := newQSub("foo", "bar") + subs[i] = sub + } + + for i := 0; i < 2; i++ { + for _, sub := range subs { + s.Insert(sub) + } + // Call Match() once or twice, to make sure we get from cache + if i == 1 { + s.Match("foo") + } + // This will be from cache when i==1 + r := s.Match("foo") + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + for _, sub := range subs { + s.Remove(sub) + } + wg.Done() + }() + for _, qsub := range r.qsubs { + for i := 0; i < len(qsub); i++ { + sub := qsub[i] + if string(sub.queue) != "bar" { + t.Fatalf("Queue name should be bar, got %s", qsub[i].queue) + } + } + } + wg.Wait() + } + + // Repeat tests with regular subs + for i := 0; i < total; i++ { + sub := newSub("foo") + subs[i] = sub + } + + for i := 0; i < 2; i++ { + for _, sub := range subs { + s.Insert(sub) + } + // Call Match() once or twice, to make sure we get from cache + if i == 1 { + s.Match("foo") + } + // This will be from cache when i==1 + r := s.Match("foo") + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + for _, sub := range subs { + s.Remove(sub) + } + wg.Done() + }() + for i := 0; i < len(r.psubs); i++ { + sub := r.psubs[i] + if string(sub.subject) != "foo" { + t.Fatalf("Subject should be foo, got %s", sub.subject) + } + } + wg.Wait() + } +} + // -- Benchmarks Setup -- var subs []*subscription From eb28cf0eda59434ee96bdf3e907114ef02e95c07 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Sat, 10 Mar 2018 08:35:15 -0700 Subject: [PATCH 2/3] Do the copy only of the array of subscriptions that we add to the results Instead of making a copy of the whole results, make sure that we don't pass a sublist array to the result but its copy. --- server/sublist.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/server/sublist.go b/server/sublist.go index 62bfdb8b624..ffa185a4c7f 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -236,13 +236,8 @@ func (s *Sublist) Match(subject string) *SublistResult { s.Lock() matchLevel(s.root, tokens, result) - // Store in cache and return to caller a copy of the results to avoid - // race when sub is removed from sublist and caller walks through the - // results. - cr := copyResult(result) - // Add to our cache - s.cache[subject] = cr + s.cache[subject] = result // Bound the number of entries to sublistMaxCache if len(s.cache) > slCacheMax { for k := range s.cache { @@ -252,7 +247,7 @@ func (s *Sublist) Match(subject string) *SublistResult { } s.Unlock() - return cr + return result } // This will add in a node's results to the total results. @@ -266,7 +261,9 @@ func addNodeToResults(n *node, results *SublistResult) { if i := findQSliceForSub(qr[0], results.qsubs); i >= 0 { results.qsubs[i] = append(results.qsubs[i], qr...) } else { - results.qsubs = append(results.qsubs, qr) + copyqr := make([]*subscription, len(qr)) + copy(copyqr, qr) + results.qsubs = append(results.qsubs, copyqr) } } } From 3bfb2d7cb456b4f25354cf259266c532a3f9d5eb Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 15 Mar 2018 14:54:10 -0600 Subject: [PATCH 3/3] Variant of copy and add some tests Use the append([]*subscription(nil), qr...) notation instead of make()+copy(). --- server/sublist.go | 4 +- server/sublist_test.go | 100 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 100 insertions(+), 4 deletions(-) diff --git a/server/sublist.go b/server/sublist.go index ffa185a4c7f..3af9fb39ab7 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -261,9 +261,7 @@ func addNodeToResults(n *node, results *SublistResult) { if i := findQSliceForSub(qr[0], results.qsubs); i >= 0 { results.qsubs[i] = append(results.qsubs[i], qr...) } else { - copyqr := make([]*subscription, len(qr)) - copy(copyqr, qr) - results.qsubs = append(results.qsubs, copyqr) + results.qsubs = append(results.qsubs, append([]*subscription(nil), qr...)) } } } diff --git a/server/sublist_test.go b/server/sublist_test.go index afad7a7d524..dbea2e132ee 100644 --- a/server/sublist_test.go +++ b/server/sublist_test.go @@ -512,7 +512,7 @@ func TestSublistRemoveWithWildcardsAsLiterals(t *testing.T) { } } -func TestSubListRaceOnRemove(t *testing.T) { +func TestSublistRaceOnRemove(t *testing.T) { s := NewSublist() var ( @@ -587,6 +587,104 @@ func TestSubListRaceOnRemove(t *testing.T) { } } +func TestSublistRaceOnInsert(t *testing.T) { + s := NewSublist() + + var ( + total = 100 + subs = make(map[int]*subscription, total) // use map for randomness + wg sync.WaitGroup + ) + for i := 0; i < total; i++ { + sub := newQSub("foo", "bar") + subs[i] = sub + } + wg.Add(1) + go func() { + for _, sub := range subs { + s.Insert(sub) + } + wg.Done() + }() + for i := 0; i < 1000; i++ { + r := s.Match("foo") + for _, qsubs := range r.qsubs { + for _, qsub := range qsubs { + if string(qsub.queue) != "bar" { + t.Fatalf("Expected queue name to be bar, got %v", string(qsub.queue)) + } + } + } + } + wg.Wait() + + // Repeat the test with plain subs + for i := 0; i < total; i++ { + sub := newSub("foo") + subs[i] = sub + } + wg.Add(1) + go func() { + for _, sub := range subs { + s.Insert(sub) + } + wg.Done() + }() + for i := 0; i < 1000; i++ { + r := s.Match("foo") + for _, sub := range r.psubs { + if string(sub.subject) != "foo" { + t.Fatalf("Expected subject to be foo, got %v", string(sub.subject)) + } + } + } + wg.Wait() +} + +func TestSublistRaceOnMatch(t *testing.T) { + s := NewSublist() + s.Insert(newQSub("foo.*", "workers")) + s.Insert(newQSub("foo.bar", "workers")) + s.Insert(newSub("foo.*")) + s.Insert(newSub("foo.bar")) + + wg := sync.WaitGroup{} + wg.Add(2) + errCh := make(chan error, 2) + f := func() { + defer wg.Done() + for i := 0; i < 10; i++ { + r := s.Match("foo.bar") + for _, sub := range r.psubs { + if !strings.HasPrefix(string(sub.subject), "foo.") { + errCh <- fmt.Errorf("Wrong subject: %s", sub.subject) + return + } + } + for _, qsub := range r.qsubs { + for _, sub := range qsub { + if string(sub.queue) != "workers" { + errCh <- fmt.Errorf("Wrong queue name: %s", sub.queue) + return + } + } + } + // Empty cache to maximize chance for race + s.Lock() + delete(s.cache, "foo.bar") + s.Unlock() + } + } + go f() + go f() + wg.Wait() + select { + case e := <-errCh: + t.Fatalf(e.Error()) + default: + } +} + // -- Benchmarks Setup -- var subs []*subscription