Skip to content

Commit

Permalink
Merge pull request #94 from resgateio/bugfix/gh-93-queued-event-may-b…
Browse files Browse the repository at this point in the history
…e-sent-multiple-times

Bugfix/gh 93 queued event may be sent multiple times
  • Loading branch information
jirenius authored Jul 29, 2019
2 parents 5f456f9 + 90a6465 commit dadfd33
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 9 deletions.
10 changes: 8 additions & 2 deletions logger/memlogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"log"
"sync"
)

// MemLogger writes log messages to os.Stderr
Expand All @@ -12,6 +13,7 @@ type MemLogger struct {
b *bytes.Buffer
debug bool
trace bool
mu sync.Mutex
}

// NewMemLogger returns a new logger that writes to a bytes buffer
Expand All @@ -33,24 +35,28 @@ func NewMemLogger(debug bool, trace bool) *MemLogger {

// Logf writes a log entry
func (l *MemLogger) Logf(prefix string, format string, v ...interface{}) {
l.mu.Lock()
l.log.Print(prefix, fmt.Sprintf(format, v...))
l.mu.Unlock()
}

// Debugf writes a debug entry
func (l *MemLogger) Debugf(prefix string, format string, v ...interface{}) {
if l.debug {
l.log.Print(prefix, fmt.Sprintf(format, v...))
l.Logf(prefix, format, v...)
}
}

// Tracef writes a trace entry
func (l *MemLogger) Tracef(prefix string, format string, v ...interface{}) {
if l.trace {
l.log.Print(prefix, fmt.Sprintf(format, v...))
l.Logf(prefix, format, v...)
}
}

// String returns the log
func (l *MemLogger) String() string {
l.mu.Lock()
defer l.mu.Unlock()
return l.b.String()
}
14 changes: 7 additions & 7 deletions server/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@ func (s *Subscription) GetRPCResources() *rpc.Resources {
return r
}

// ReleaseRPCResources will unlock all resources locked by GetRPCResource
// and will mark the subscription as sent.
// ReleaseRPCResources will unlock all resources locked by GetRPCResource,
// unqueue any events, and mark the subscription as sent.
func (s *Subscription) ReleaseRPCResources() {
if s.state == stateDisposed ||
s.state == stateSent ||
Expand All @@ -280,17 +280,17 @@ func (s *Subscription) queueEvents() {

func (s *Subscription) unqueueEvents() {
s.isQueueing = false
eq := s.eventQueue
s.eventQueue = nil

for i, event := range s.eventQueue {
for i, event := range eq {
s.processEvent(event)
// Did one of the events activate queueing again?
if s.isQueueing {
s.eventQueue = s.eventQueue[i+1:]
s.eventQueue = append(eq[i+1:], s.eventQueue...)
return
}
}

s.eventQueue = nil
}

// populateResources iterates recursively down the subscription tree
Expand Down Expand Up @@ -389,7 +389,7 @@ func (s *Subscription) subscribeRef(v codec.Value) bool {
}

// collectRefs will wait for all references to be loaded
// and call doneLoading() once completed.
// and call the callback once completed.
func (s *Subscription) collectRefs(rcb *readyCallback) {
for rid, ref := range s.refs {
// Don't wait for already ready references
Expand Down
142 changes: 142 additions & 0 deletions test/18event_queuing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package test

import (
"encoding/json"
"testing"
)

// Test event while queuing
func TestEventWhileQueueing(t *testing.T) {
model := resourceData("test.model")

runTest(t, func(s *Session) {

c := s.Connect()
subscribeToTestCollection(t, s, c)

// Send event on collection and validate client event
s.ResourceEvent("test.collection", "add", json.RawMessage(`{"idx":1,"value":{"rid":"test.model"}}`))

// Handle collection get request
req := s.
GetRequest(t).
AssertSubject(t, "get.test.model")

// Send additional add event on collection
s.ResourceEvent("test.collection", "add", json.RawMessage(`{"idx":2,"value":"newValue"}`))
c.AssertNoEvent(t, "test.collection")

req.RespondSuccess(json.RawMessage(`{"model":` + model + `}`))

c.GetEvent(t).Equals(t, "test.collection.add", json.RawMessage(`{"idx":1,"value":{"rid":"test.model"},"models":{"test.model":`+model+`}}`))
c.GetEvent(t).Equals(t, "test.collection.add", json.RawMessage(`{"idx":2,"value":"newValue"}`))
})
}

// Test event with added reference while queuing
func TestReferenceEventWhileQueuing(t *testing.T) {
model := resourceData("test.model")

runTest(t, func(s *Session) {

c := s.Connect()
subscribeToTestCollection(t, s, c)

// Send event on collection and let it queue
s.ResourceEvent("test.collection", "add", json.RawMessage(`{"idx":1,"value":{"rid":"test.model"}}`))

// Get model get request
req1 := s.
GetRequest(t).
AssertSubject(t, "get.test.model")

// Send two additional add events on collection
s.ResourceEvent("test.collection", "add", json.RawMessage(`{"idx":2,"value":{"rid":"test.model2"}}`))
s.ResourceEvent("test.collection", "add", json.RawMessage(`{"idx":3,"value":"newValue"}`))

// Respond to model get request
req1.RespondSuccess(json.RawMessage(`{"model":` + model + `}`))
c.GetEvent(t).Equals(t, "test.collection.add", json.RawMessage(`{"idx":1,"value":{"rid":"test.model"},"models":{"test.model":`+model+`}}`))

// Get second model get request
req2 := s.
GetRequest(t).
AssertSubject(t, "get.test.model2")

// Assert no more events are send
c.AssertNoEvent(t, "test.collection")

// Respond to second model get request
req2.RespondSuccess(json.RawMessage(`{"model":` + model + `}`))

c.GetEvent(t).Equals(t, "test.collection.add", json.RawMessage(`{"idx":2,"value":{"rid":"test.model2"},"models":{"test.model2":`+model+`}}`))
c.GetEvent(t).Equals(t, "test.collection.add", json.RawMessage(`{"idx":3,"value":"newValue"}`))
})
}

// Test event with cached reference while queuing
func TestCachedReferenceEventWhileQueuing(t *testing.T) {
model := resourceData("test.model")
collection := resourceData("test.collection")

runTest(t, func(s *Session) {

c := s.Connect()
subscribeToTestCollectionParent(t, s, c, false)

// Send remove event on collection and validate client event
s.ResourceEvent("test.collection.parent", "remove", json.RawMessage(`{"idx":1}`))
c.GetEvent(t).Equals(t, "test.collection.parent.remove", json.RawMessage(`{"idx":1}`))

// Send event on collection and let it queue
s.ResourceEvent("test.collection.parent", "add", json.RawMessage(`{"idx":1,"value":{"rid":"test.model"}}`))
// Get model get request
req := s.
GetRequest(t).
AssertSubject(t, "get.test.model")

// Send additional add event with cached reference
s.ResourceEvent("test.collection.parent", "add", json.RawMessage(`{"idx":2,"value":{"rid":"test.collection"}}`))

// Respond to model get request
req.RespondSuccess(json.RawMessage(`{"model":` + model + `}`))
c.GetEvent(t).Equals(t, "test.collection.parent.add", json.RawMessage(`{"idx":1,"value":{"rid":"test.model"},"models":{"test.model":`+model+`}}`))
c.GetEvent(t).Equals(t, "test.collection.parent.add", json.RawMessage(`{"idx":2,"value":{"rid":"test.collection"},"collections":{"test.collection":`+collection+`}}`))
})
}

// Test event with cached reference while queuing
func TestUnqueueEventWithLoadedReferenceResource(t *testing.T) {
model := resourceData("test.model")

runTest(t, func(s *Session) {

c := s.Connect()
subscribeToTestModel(t, s, c)
subscribeToTestCollection(t, s, c)

// Send event on model with three references, and load only one
s.ResourceEvent("test.model", "change", json.RawMessage(`{"values":{"a":{"rid":"test.model.a"},"b":{"rid":"test.model.b"},"c":{"rid":"test.model.c"}}}`))
mreq := s.GetParallelRequests(t, 3)
mreq.GetRequest(t, "get.test.model.a").RespondSuccess(json.RawMessage(`{"model":` + model + `}`))

// Send multiple events on the same resource
s.ResourceEvent("test.collection", "add", json.RawMessage(`{"idx":1,"value":{"rid":"test.model.b"}}`))
s.ResourceEvent("test.collection", "add", json.RawMessage(`{"idx":2,"value":{"rid":"test.model.a"}}`))

// Make sure test.model.a is loaded by making a roundtrip to the service
c.AssertNoEvent(t, "test.collection")

// Respond to the
mreq.GetRequest(t, "get.test.model.b").RespondSuccess(json.RawMessage(`{"model":` + model + `}`))

// Respond to model get request
c.GetEvent(t).Equals(t, "test.collection.add", json.RawMessage(`{"idx":1,"value":{"rid":"test.model.b"},"models":{"test.model.b":`+model+`}}`))
c.GetEvent(t).Equals(t, "test.collection.add", json.RawMessage(`{"idx":2,"value":{"rid":"test.model.a"},"models":{"test.model.a":`+model+`}}`))

// Respond to the last resource needed for the change event
mreq.GetRequest(t, "get.test.model.c").RespondSuccess(json.RawMessage(`{"model":` + model + `}`))

c.GetEvent(t).Equals(t, "test.model.change", json.RawMessage(`{"values":{"a":{"rid":"test.model.a"},"b":{"rid":"test.model.b"},"c":{"rid":"test.model.c"}},"models":{"test.model.c":`+model+`}}`))
})
}

0 comments on commit dadfd33

Please sign in to comment.