Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] RACE in Sublist #641

Merged
merged 3 commits into from
Mar 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/sublist.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func addNodeToResults(n *node, results *SublistResult) {
if i := findQSliceForSub(qr[0], results.qsubs); i >= 0 {
results.qsubs[i] = append(results.qsubs[i], qr...)
} else {
results.qsubs = append(results.qsubs, qr)
results.qsubs = append(results.qsubs, append([]*subscription(nil), qr...))
}
}
}
Expand Down
176 changes: 176 additions & 0 deletions server/sublist_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright 2012-2017 Apcera Inc. All rights reserved.
// Copyright 2018 Synadia Communications Inc. All rights reserved.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will change all of these at some point next week.


package server

import (
Expand Down Expand Up @@ -509,6 +512,179 @@ func TestSublistRemoveWithWildcardsAsLiterals(t *testing.T) {
}
}

func TestSublistRaceOnRemove(t *testing.T) {
s := NewSublist()

var (
total = 100
subs = make(map[int]*subscription, total) // use map for randomness
)
for i := 0; i < total; i++ {
sub := newQSub("foo", "bar")
subs[i] = sub
}

for i := 0; i < 2; i++ {
for _, sub := range subs {
s.Insert(sub)
}
// Call Match() once or twice, to make sure we get from cache
if i == 1 {
s.Match("foo")
}
// This will be from cache when i==1
r := s.Match("foo")
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
for _, sub := range subs {
s.Remove(sub)
}
wg.Done()
}()
for _, qsub := range r.qsubs {
for i := 0; i < len(qsub); i++ {
sub := qsub[i]
if string(sub.queue) != "bar" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rewrite it above to make it deterministic?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean here?

t.Fatalf("Queue name should be bar, got %s", qsub[i].queue)
}
}
}
wg.Wait()
}

// Repeat tests with regular subs
for i := 0; i < total; i++ {
sub := newSub("foo")
subs[i] = sub
}

for i := 0; i < 2; i++ {
for _, sub := range subs {
s.Insert(sub)
}
// Call Match() once or twice, to make sure we get from cache
if i == 1 {
s.Match("foo")
}
// This will be from cache when i==1
r := s.Match("foo")
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
for _, sub := range subs {
s.Remove(sub)
}
wg.Done()
}()
for i := 0; i < len(r.psubs); i++ {
sub := r.psubs[i]
if string(sub.subject) != "foo" {
t.Fatalf("Subject should be foo, got %s", sub.subject)
}
}
wg.Wait()
}
}

func TestSublistRaceOnInsert(t *testing.T) {
s := NewSublist()

var (
total = 100
subs = make(map[int]*subscription, total) // use map for randomness
wg sync.WaitGroup
)
for i := 0; i < total; i++ {
sub := newQSub("foo", "bar")
subs[i] = sub
}
wg.Add(1)
go func() {
for _, sub := range subs {
s.Insert(sub)
}
wg.Done()
}()
for i := 0; i < 1000; i++ {
r := s.Match("foo")
for _, qsubs := range r.qsubs {
for _, qsub := range qsubs {
if string(qsub.queue) != "bar" {
t.Fatalf("Expected queue name to be bar, got %v", string(qsub.queue))
}
}
}
}
wg.Wait()

// Repeat the test with plain subs
for i := 0; i < total; i++ {
sub := newSub("foo")
subs[i] = sub
}
wg.Add(1)
go func() {
for _, sub := range subs {
s.Insert(sub)
}
wg.Done()
}()
for i := 0; i < 1000; i++ {
r := s.Match("foo")
for _, sub := range r.psubs {
if string(sub.subject) != "foo" {
t.Fatalf("Expected subject to be foo, got %v", string(sub.subject))
}
}
}
wg.Wait()
}

func TestSublistRaceOnMatch(t *testing.T) {
s := NewSublist()
s.Insert(newQSub("foo.*", "workers"))
s.Insert(newQSub("foo.bar", "workers"))
s.Insert(newSub("foo.*"))
s.Insert(newSub("foo.bar"))

wg := sync.WaitGroup{}
wg.Add(2)
errCh := make(chan error, 2)
f := func() {
defer wg.Done()
for i := 0; i < 10; i++ {
r := s.Match("foo.bar")
for _, sub := range r.psubs {
if !strings.HasPrefix(string(sub.subject), "foo.") {
errCh <- fmt.Errorf("Wrong subject: %s", sub.subject)
return
}
}
for _, qsub := range r.qsubs {
for _, sub := range qsub {
if string(sub.queue) != "workers" {
errCh <- fmt.Errorf("Wrong queue name: %s", sub.queue)
return
}
}
}
// Empty cache to maximize chance for race
s.Lock()
delete(s.cache, "foo.bar")
s.Unlock()
}
}
go f()
go f()
wg.Wait()
select {
case e := <-errCh:
t.Fatalf(e.Error())
default:
}
}

// -- Benchmarks Setup --

var subs []*subscription
Expand Down