Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use atomic.Load to access fields used in /varz and /subsz requests. #445

Merged
merged 2 commits into from
Mar 3, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions server/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,11 +473,11 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) {
v.TotalConnections = s.totalClients
v.Routes = len(s.routes)
v.Remotes = len(s.remotes)
v.InMsgs = s.inMsgs
v.InBytes = s.inBytes
v.OutMsgs = s.outMsgs
v.OutBytes = s.outBytes
v.SlowConsumers = s.slowConsumers
v.InMsgs = atomic.LoadInt64(&s.inMsgs)
v.InBytes = atomic.LoadInt64(&s.inBytes)
v.OutMsgs = atomic.LoadInt64(&s.outMsgs)
v.OutBytes = atomic.LoadInt64(&s.outBytes)
v.SlowConsumers = atomic.LoadInt64(&s.slowConsumers)
v.Subscriptions = s.sl.Count()
s.httpReqStats[VarzPath]++
// Need a copy here since s.httpReqStas can change while doing
Expand Down
6 changes: 3 additions & 3 deletions server/sublist.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,9 +505,9 @@ func (s *Sublist) Stats() *SublistStats {
st.NumCache = uint32(len(s.cache))
st.NumInserts = s.inserts
st.NumRemoves = s.removes
st.NumMatches = s.matches
if s.matches > 0 {
st.CacheHitRate = float64(s.cacheHits) / float64(s.matches)
st.NumMatches = atomic.LoadUint64(&s.matches)
if st.NumMatches > 0 {
st.CacheHitRate = float64(atomic.LoadUint64(&s.cacheHits)) / float64(st.NumMatches)
}
// whip through cache for fanout stats
tot, max := 0, 0
Expand Down
93 changes: 93 additions & 0 deletions test/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (
"testing"
"time"

"sync"
"sync/atomic"

"github.com/nats-io/gnatsd/server"
"github.com/nats-io/go-nats"
)
Expand All @@ -30,6 +33,34 @@ func runMonitorServer() *server.Server {
return RunServer(&opts)
}

// Runs a clustered pair of monitor servers for testing the /routez endpoint
func runMonitorServerClusteredPair(t *testing.T) (*server.Server, *server.Server) {
resetPreviousHTTPConnections()
opts := DefaultTestOptions
opts.Port = CLIENT_PORT
opts.HTTPPort = MONITOR_PORT
opts.HTTPHost = "localhost"
opts.Cluster.Host = "127.0.0.1"
opts.Cluster.Port = 10223
opts.Routes = server.RoutesFromStr("nats-route://127.0.0.1:10222")

s1 := RunServer(&opts)

opts2 := DefaultTestOptions
opts2.Port = CLIENT_PORT + 1
opts2.HTTPPort = MONITOR_PORT + 1
opts2.HTTPHost = "localhost"
opts2.Cluster.Host = "127.0.0.1"
opts2.Cluster.Port = 10222
opts2.Routes = server.RoutesFromStr("nats-route://127.0.0.1:10223")

s2 := RunServer(&opts2)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may want to call checkClusterFormed(t, s1, s2) here to ensure cluster is formed, instead of using time.Sleep(2 * time.Second) in the caller. You would need to pass t *testing.T as parameter to runMonitorServerClusteredPair().

Copy link
Member Author

@ColinSullivan1 ColinSullivan1 Mar 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

checkClusterFormed(t, s1, s2)

return s1, s2
}

func runMonitorServerNoHTTPPort() *server.Server {
resetPreviousHTTPConnections()
opts := DefaultTestOptions
Expand Down Expand Up @@ -60,6 +91,68 @@ func TestNoMonitorPort(t *testing.T) {
}
}

// testEndpointDataRace tests a monitoring endpoint for data races by polling
// while client code acts to ensure statistics are updated. It is designed to
// run under the -race flag to catch violations. The caller must start the
// NATS server.
func testEndpointDataRace(endpoint string, t *testing.T) {
var doneWg sync.WaitGroup

url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT)

// Poll as fast as we can, while creating connections, publishing,
// and subscribing.
clientDone := int64(0)
doneWg.Add(1)
go func() {
for atomic.LoadInt64(&clientDone) == 0 {
resp, err := http.Get(url + endpoint)
if err != nil {
t.Errorf("Expected no error: Got %v\n", err)
} else {
resp.Body.Close()
}
}
doneWg.Done()
}()

// create connections, subscriptions, and publish messages to
// update the monitor variables.
var conns []net.Conn
for i := 0; i < 50; i++ {
cl := createClientConnSubscribeAndPublish(t)
// keep a few connections around to test monitor variables.
if i%10 == 0 {
conns = append(conns, cl)
} else {
cl.Close()
}
}
atomic.AddInt64(&clientDone, 1)

// wait for the endpoint polling goroutine to exit
doneWg.Wait()

// cleanup the conns
for _, cl := range conns {
cl.Close()
}
}

func TestEndpointDataRaces(t *testing.T) {
// setup a small cluster to test /routez
s1, s2 := runMonitorServerClusteredPair(t)
defer s1.Shutdown()
defer s2.Shutdown()

// test all of our endpoints
testEndpointDataRace("varz", t)
testEndpointDataRace("connz", t)
testEndpointDataRace("routez", t)
testEndpointDataRace("subsz", t)
testEndpointDataRace("stacksz", t)
}

func TestVarz(t *testing.T) {
s := runMonitorServer()
defer s.Shutdown()
Expand Down