Skip to content

Commit

Permalink
Merge pull request #117 from resgateio/feature/gh-110-allow-query-on-…
Browse files Browse the repository at this point in the history
…non-query-get-request

Feature/gh 110 allow query on non query get request
  • Loading branch information
jirenius authored Sep 30, 2019
2 parents 73f1a35 + e4c9196 commit 7d1d52b
Show file tree
Hide file tree
Showing 10 changed files with 120 additions and 37 deletions.
8 changes: 4 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
language: go
go:
- 1.11.x
- 1.12.x
- 1.13.x
install:
- go get github.com/mattn/goveralls
- go get honnef.co/go/tools/cmd/staticcheck
Expand All @@ -13,9 +13,9 @@ before_script:
- $(exit $(go fmt $PACKAGES | wc -l))
- misspell -error -locale US .
- staticcheck $PACKAGES
- if [[ "$TRAVIS_GO_VERSION" =~ ^1\.12\. ]] && [ "$TRAVIS_TAG" != "" ]; then ./scripts/cross_compile.sh $TRAVIS_TAG; fi
- if [[ "$TRAVIS_GO_VERSION" =~ ^1\.13\. ]] && [ "$TRAVIS_TAG" != "" ]; then ./scripts/cross_compile.sh $TRAVIS_TAG; fi
script:
- go test -i $PACKAGES
- if [[ "$TRAVIS_GO_VERSION" =~ ^1\.12\. ]]; then ./scripts/cover.sh TRAVIS; else go test -v -race $PACKAGES; fi
- if [[ "$TRAVIS_GO_VERSION" =~ ^1\.13\. ]]; then ./scripts/cover.sh TRAVIS; else go test -v -race $PACKAGES; fi
after_success:
- if [[ "$TRAVIS_GO_VERSION" =~ ^1\.12\. ]] && [ "$TRAVIS_TAG" != "" ]; then ghr --owner resgateio --token $GITHUB_TOKEN --draft --replace $TRAVIS_TAG pkg/; fi
- if [[ "$TRAVIS_GO_VERSION" =~ ^1\.13\. ]] && [ "$TRAVIS_TAG" != "" ]; then ghr --owner resgateio --token $GITHUB_TOKEN --draft --replace $TRAVIS_TAG pkg/; fi
14 changes: 8 additions & 6 deletions nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,15 +239,17 @@ func (c *Client) parseMeta(msg *nats.Msg, rc responseCont) {
if v, ok := tag.Lookup("timeout"); ok {
timeout, err := strconv.Atoi(v)
if err == nil {
var removed bool
if rc.t == nil {
c.tq.Remove(msg.Sub)
removed = c.tq.Remove(msg.Sub)
} else {
rc.t.Stop()
removed = rc.t.Stop()
}
if removed {
rc.t = time.AfterFunc(time.Duration(timeout)*time.Millisecond, func() {
c.onTimeout(msg.Sub)
})
}
rc.t = time.AfterFunc(time.Duration(timeout)*time.Millisecond, func() {
c.onTimeout(msg.Sub)
})
c.mqReqs[msg.Sub] = rc
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion scripts/cover.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go test -v -covermode=atomic -coverprofile=./cover.out -coverpkg=./server/... ./

# If we have an arg, assume travis run and push to coveralls. Otherwise launch browser results
if [[ -n $1 ]]; then
$HOME/gopath/bin/goveralls -coverprofile=cover.out -service travis-ci
$HOME/gopath/bin/goveralls -coverprofile=cover.out -service=travis-ci
rm -rf ./cover.out
else
go tool cover -html=cover.out
Expand Down
2 changes: 2 additions & 0 deletions server/apiHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ func httpError(w http.ResponseWriter, err error, enc APIEncoder) {
switch rerr.Code {
case reserr.CodeNotFound:
fallthrough
case reserr.CodeMethodNotFound:
fallthrough
case reserr.CodeTimeout:
code = http.StatusNotFound
case reserr.CodeAccessDenied:
Expand Down
8 changes: 5 additions & 3 deletions server/rescache/eventSubscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,9 @@ func (e *EventSubscription) enqueueEvent(subj string, payload []byte) {
e.handleQueryEvent(subj, payload)
default:

if e.base == nil {
// Validate we have a base resource,
// and that it is not a link to a query resource.
if e.base == nil || e.base.query != "" {
return
}

Expand Down Expand Up @@ -315,7 +317,7 @@ func (e *EventSubscription) mqUnsubscribe() bool {

func (e *EventSubscription) handleResetResource() {
e.Enqueue(func() {
if e.base != nil {
if e.base != nil && e.base.query == "" {
e.base.handleResetResource()
}

Expand All @@ -327,7 +329,7 @@ func (e *EventSubscription) handleResetResource() {

func (e *EventSubscription) handleResetAccess() {
e.Enqueue(func() {
if e.base != nil {
if e.base != nil && e.base.query == "" {
e.base.handleResetAccess()
}

Expand Down
32 changes: 16 additions & 16 deletions server/rescache/resourceSubscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package rescache

import (
"encoding/json"
"errors"

"github.com/resgateio/resgate/server/codec"
)
Expand All @@ -17,8 +16,6 @@ const (
stateModel
)

var errQueryResourceOnNonQueryRequest = errors.New("query resource on non-query request")

// Model represents a RES model
// https://github.com/resgateio/resgate/blob/master/docs/res-protocol.md#models
type Model struct {
Expand Down Expand Up @@ -231,7 +228,7 @@ func (rs *ResourceSubscription) handleEventAdd(r *ResourceEvent) bool {
l := len(old)

if idx < 0 || idx > l {
rs.e.cache.Logf("Error processing event %s.%s: Idx %d not valid", rs.e.ResourceName, r.Event, idx)
rs.e.cache.Logf("Error processing event %s.%s: idx %d is out of bounds", rs.e.ResourceName, r.Event, idx)
return false
}

Expand Down Expand Up @@ -266,7 +263,7 @@ func (rs *ResourceSubscription) handleEventRemove(r *ResourceEvent) bool {
l := len(old)

if idx < 0 || idx >= l {
rs.e.cache.Logf("Error processing event %s.%s: Idx %d not valid", rs.e.ResourceName, r.Event, idx)
rs.e.cache.Logf("Error processing event %s.%s: idx %d is out of bounds", rs.e.ResourceName, r.Event, idx)
return false
}

Expand Down Expand Up @@ -309,7 +306,11 @@ func (rs *ResourceSubscription) unregister() {
delete(rs.e.queries, rs.query)
}
for _, q := range rs.links {
delete(rs.e.links, q)
if q == "" {
rs.e.base = nil
} else {
delete(rs.e.links, q)
}
}
rs.links = nil
}
Expand All @@ -322,11 +323,6 @@ func (rs *ResourceSubscription) processGetResponse(payload []byte, err error) (n
result, err = codec.DecodeGetResponse(payload)
}

// Assert a non-query request did not result in a query resource
if err == nil && rs.query == "" && result.Query != "" {
err = errQueryResourceOnNonQueryRequest
}

// Get request failed
if err != nil {
// Set state and store the error in case any other
Expand Down Expand Up @@ -356,12 +352,16 @@ func (rs *ResourceSubscription) processGetResponse(payload []byte, err error) (n
// Then we should create a link to the normalized query
if result.Query != rs.query {
nrs = rs.e.getResourceSubscription(result.Query)
// Replace resource subscription with the normalized version
if rs.e.links == nil {
rs.e.links = make(map[string]*ResourceSubscription)
if rs.query == "" {
rs.e.base = nrs
} else {
// Replace resource subscription with the normalized version
if rs.e.links == nil {
rs.e.links = make(map[string]*ResourceSubscription)
}
rs.e.links[rs.query] = nrs
delete(rs.e.queries, rs.query)
}
rs.e.links[rs.query] = nrs
delete(rs.e.queries, rs.query)
nrs.links = append(nrs.links, rs.query)

// Copy over all subscribers
Expand Down
39 changes: 39 additions & 0 deletions test/11system_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package test

import (
"encoding/json"
"fmt"
"testing"
)

Expand Down Expand Up @@ -168,3 +169,41 @@ func TestSystemResetEventTriggersUnsubscribeOnDeniedAccessCall(t *testing.T) {
c.GetEvent(t).Equals(t, "test.collection.custom", event)
})
}

// Test that a system.reset event triggers get requests on query model
func TestSystemResetTriggersGetRequestOnQueryModel(t *testing.T) {
tbl := []struct {
Query string
Normalized string
}{
{"foo=bar", "foo=bar"},
{"a=b&foo=bar", "foo=bar"},
{"", "foo=bar"},
}

for i, l := range tbl {
runNamedTest(t, fmt.Sprintf("#%d", i+1), func(s *Session) {
model := resourceData("test.model")

c := s.Connect()

// Get model
subscribeToTestQueryModel(t, s, c, l.Query, l.Normalized)

// Send system reset
s.SystemEvent("reset", json.RawMessage(`{"resources":["test.>"]}`))

// Validate a get request is sent
s.GetRequest(t).
AssertSubject(t, "get.test.model").
AssertPathPayload(t, "query", l.Normalized).
RespondSuccess(json.RawMessage(`{"model":` + model + `}`))

// Validate no more requests are sent to NATS
c.AssertNoNATSRequest(t, "test.model")

// Validate no events are sent to client
c.AssertNoEvent(t, "test.model")
})
}
}
29 changes: 29 additions & 0 deletions test/12query_subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,35 @@ func TestSendingQueryToNonQueryModel(t *testing.T) {
})
}

// Test query model response to non-query model subscribe requests
func TestSendingNonQueryToQueryModel(t *testing.T) {
runTest(t, func(s *Session) {
model := resourceData("test.model")
event := json.RawMessage(`{"foo":"bar"}`)
query := "q=foo&f=bar"

c := s.Connect()

// Send subscribe request
creq := c.Request("subscribe.test.model", nil)

// Handle model get and access request
mreqs := s.GetParallelRequests(t, 2)
mreqs.
GetRequest(t, "get.test.model").
RespondSuccess(json.RawMessage(`{"model":` + model + `,"query":"` + query + `"}`))
mreqs.GetRequest(t, "access.test.model").RespondSuccess(json.RawMessage(`{"get":true}`))

// Validate client response and validate
creq.GetResponse(t).AssertResult(t, json.RawMessage(`{"models":{"test.model":`+model+`}}`))

// Send event on model and validate client event
s.ResourceEvent("test.model", "custom", event)
c.AssertNoEvent(t, "test.model")
c.AssertNoNATSRequest(t, "test.model")
})
}

// Test subscribing to query model
func TestSubscribingToQueryModel(t *testing.T) {
runTest(t, func(s *Session) {
Expand Down
1 change: 1 addition & 0 deletions test/15http_post_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func TestHTTPPostResponses(t *testing.T) {
{nil, nil, noRequest, http.StatusNotFound, mq.ErrRequestTimeout},
// CallResponse variants
{nil, fullCallAccess, reserr.ErrInvalidParams, http.StatusBadRequest, reserr.ErrInvalidParams},
{nil, fullCallAccess, reserr.ErrMethodNotFound, http.StatusNotFound, reserr.ErrMethodNotFound},
{nil, fullCallAccess, nil, http.StatusNoContent, []byte{}},
}

Expand Down
22 changes: 15 additions & 7 deletions test/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,21 +146,29 @@ func subscribeToTestQueryModel(t *testing.T, s *Session, c *Conn, q, normq strin
panic("test: failed to marshal normalized query: " + err.Error())
}

qj, err := json.Marshal("test.model?" + q)
rid := "test.model"
if q != "" {
rid += "?" + q
}
qj, err := json.Marshal(rid)
if err != nil {
panic("test: failed to marshal query: " + err.Error())
}

// Send subscribe request
creq := c.Request("subscribe.test.model?"+q, nil)
creq := c.Request("subscribe."+rid, nil)

// Handle model get and access request
mreqs := s.GetParallelRequests(t, 2)
mreqs.
GetRequest(t, "get.test.model").
AssertPathPayload(t, "query", q).
RespondSuccess(json.RawMessage(`{"model":` + model + `,"query":` + string(normqj) + `}`))
req := mreqs.GetRequest(t, "access.test.model").AssertPathPayload(t, "query", q)
req := mreqs.GetRequest(t, "get.test.model")
if q != "" {
req.AssertPathPayload(t, "query", q)
}
req.RespondSuccess(json.RawMessage(`{"model":` + model + `,"query":` + string(normqj) + `}`))
req = mreqs.GetRequest(t, "access.test.model")
if q != "" {
req.AssertPathPayload(t, "query", q)
}
cid := req.PathPayload(t, "cid").(string)
req.RespondSuccess(json.RawMessage(`{"get":true}`))

Expand Down

0 comments on commit 7d1d52b

Please sign in to comment.