Skip to content

Commit

Permalink
feat(rpc/subscription): implement state_unsubscribeStorage (ChainSafe…
Browse files Browse the repository at this point in the history
…#1574)

* implement state_unsubscribeStorage

* add value checks, add tests

* handle string parameter, add tests, use const for error messages

* parse to uint

* update type

* update variable names (based on comments)

Co-authored-by: Arijit Das <[email protected]>
  • Loading branch information
2 people authored and timwu20 committed Dec 6, 2021
1 parent 3802780 commit 56f594a
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 6 deletions.
24 changes: 23 additions & 1 deletion dot/rpc/subscription/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ type Params struct {
SubscriptionID uint `json:"subscription"`
}

// InvalidRequestCode error code returned for invalid request parameters, value derived from Substrate node output
const InvalidRequestCode = -32600

// InvalidRequestMessage error message for invalid request parameters
const InvalidRequestMessage = "Invalid request"

func newSubcriptionBaseResponseJSON() BaseResponseJSON {
return BaseResponseJSON{
Jsonrpc: "2.0",
Expand All @@ -52,10 +58,26 @@ type ResponseJSON struct {
ID float64 `json:"id"`
}

func newSubscriptionResponseJSON(subID uint, reqID float64) ResponseJSON {
// NewSubscriptionResponseJSON builds a Response JSON object
func NewSubscriptionResponseJSON(subID uint, reqID float64) ResponseJSON {
return ResponseJSON{
Jsonrpc: "2.0",
Result: subID,
ID: reqID,
}
}

// BooleanResponse for responses that return boolean values
type BooleanResponse struct {
JSONRPC string `json:"jsonrpc"`
Result bool `json:"result"`
ID float64 `json:"id"`
}

func newBooleanResponseJSON(value bool, reqID float64) BooleanResponse {
return BooleanResponse{
JSONRPC: "2.0",
Result: value,
ID: reqID,
}
}
54 changes: 49 additions & 5 deletions dot/rpc/subscription/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ import (
"io/ioutil"
"math/big"
"net/http"
"strconv"
"strings"
"sync"

"github.com/ChainSafe/gossamer/dot/rpc/modules"
"github.com/ChainSafe/gossamer/dot/state"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
log "github.com/ChainSafe/log15"
Expand Down Expand Up @@ -106,6 +108,9 @@ func (c *WSConn) HandleComm() {
continue
}
c.startListener(rvl)
case "state_unsubscribeStorage":
c.unsubscribeStorageListener(reqid, params)

}
continue
}
Expand Down Expand Up @@ -206,12 +211,51 @@ func (c *WSConn) initStorageChangeListener(reqID float64, params interface{}) (u

c.Subscriptions[myObs.id] = myObs

initRes := newSubscriptionResponseJSON(myObs.id, reqID)
initRes := NewSubscriptionResponseJSON(myObs.id, reqID)
c.safeSend(initRes)

return myObs.id, nil
}

func (c *WSConn) unsubscribeStorageListener(reqID float64, params interface{}) {
switch v := params.(type) {
case []interface{}:
if len(v) == 0 {
c.safeSendError(reqID, big.NewInt(InvalidRequestCode), InvalidRequestMessage)
return
}
default:
c.safeSendError(reqID, big.NewInt(InvalidRequestCode), InvalidRequestMessage)
return
}

var id uint
switch v := params.([]interface{})[0].(type) {
case float64:
id = uint(v)
case string:
i, err := strconv.ParseUint(v, 10, 32)
if err != nil {
c.safeSend(newBooleanResponseJSON(false, reqID))
return
}
id = uint(i)
default:
c.safeSendError(reqID, big.NewInt(InvalidRequestCode), InvalidRequestMessage)
return
}

observer, ok := c.Subscriptions[id].(state.Observer)
if !ok {
initRes := newBooleanResponseJSON(false, reqID)
c.safeSend(initRes)
return
}

c.StorageAPI.UnregisterStorageObserver(observer)
c.safeSend(newBooleanResponseJSON(true, reqID))
}

func (c *WSConn) initBlockListener(reqID float64) (uint, error) {
bl := &BlockListener{
Channel: make(chan *types.Block),
Expand All @@ -231,7 +275,7 @@ func (c *WSConn) initBlockListener(reqID float64) (uint, error) {
bl.subID = c.qtyListeners
c.Subscriptions[bl.subID] = bl
c.BlockSubChannels[bl.subID] = chanID
initRes := newSubscriptionResponseJSON(bl.subID, reqID)
initRes := NewSubscriptionResponseJSON(bl.subID, reqID)
c.safeSend(initRes)

return bl.subID, nil
Expand All @@ -256,7 +300,7 @@ func (c *WSConn) initBlockFinalizedListener(reqID float64) (uint, error) {
bfl.subID = c.qtyListeners
c.Subscriptions[bfl.subID] = bfl
c.BlockSubChannels[bfl.subID] = chanID
initRes := newSubscriptionResponseJSON(bfl.subID, reqID)
initRes := NewSubscriptionResponseJSON(bfl.subID, reqID)
c.safeSend(initRes)

return bfl.subID, nil
Expand Down Expand Up @@ -299,7 +343,7 @@ func (c *WSConn) initExtrinsicWatch(reqID float64, params interface{}) (uint, er
if err != nil {
return 0, err
}
c.safeSend(newSubscriptionResponseJSON(esl.subID, reqID))
c.safeSend(NewSubscriptionResponseJSON(esl.subID, reqID))

// TODO (ed) since HandleSubmittedExtrinsic has been called we assume the extrinsic is in the tx queue
// should we add a channel to tx queue so we're notified when it's in the queue (See issue #1535)
Expand All @@ -322,7 +366,7 @@ func (c *WSConn) initRuntimeVersionListener(reqID float64) (uint, error) {
c.qtyListeners++
rvl.subID = c.qtyListeners
c.Subscriptions[rvl.subID] = rvl
initRes := newSubscriptionResponseJSON(rvl.subID, reqID)
initRes := NewSubscriptionResponseJSON(rvl.subID, reqID)
c.safeSend(initRes)

return rvl.subID, nil
Expand Down
55 changes: 55 additions & 0 deletions dot/rpc/subscription/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,61 @@ func TestWSConn_HandleComm(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []byte(`{"jsonrpc":"2.0","result":4,"id":7}`+"\n"), msg)

// test state_unsubscribeStorage
c.WriteMessage(websocket.TextMessage, []byte(`{
"jsonrpc": "2.0",
"method": "state_unsubscribeStorage",
"params": "foo",
"id": 7}`))
_, msg, err = c.ReadMessage()
require.NoError(t, err)
require.Equal(t, []byte(`{"jsonrpc":"2.0","error":{"code":-32600,"message":"Invalid request"},"id":7}`+"\n"), msg)

c.WriteMessage(websocket.TextMessage, []byte(`{
"jsonrpc": "2.0",
"method": "state_unsubscribeStorage",
"params": [],
"id": 7}`))
_, msg, err = c.ReadMessage()
require.NoError(t, err)
require.Equal(t, []byte(`{"jsonrpc":"2.0","error":{"code":-32600,"message":"Invalid request"},"id":7}`+"\n"), msg)

c.WriteMessage(websocket.TextMessage, []byte(`{
"jsonrpc": "2.0",
"method": "state_unsubscribeStorage",
"params": ["6"],
"id": 7}`))
_, msg, err = c.ReadMessage()
require.NoError(t, err)
require.Equal(t, []byte(`{"jsonrpc":"2.0","result":false,"id":7}`+"\n"), msg)

c.WriteMessage(websocket.TextMessage, []byte(`{
"jsonrpc": "2.0",
"method": "state_unsubscribeStorage",
"params": ["4"],
"id": 7}`))
_, msg, err = c.ReadMessage()
require.NoError(t, err)
require.Equal(t, []byte(`{"jsonrpc":"2.0","result":true,"id":7}`+"\n"), msg)

c.WriteMessage(websocket.TextMessage, []byte(`{
"jsonrpc": "2.0",
"method": "state_unsubscribeStorage",
"params": [6],
"id": 7}`))
_, msg, err = c.ReadMessage()
require.NoError(t, err)
require.Equal(t, []byte(`{"jsonrpc":"2.0","result":false,"id":7}`+"\n"), msg)

c.WriteMessage(websocket.TextMessage, []byte(`{
"jsonrpc": "2.0",
"method": "state_unsubscribeStorage",
"params": [4],
"id": 7}`))
_, msg, err = c.ReadMessage()
require.NoError(t, err)
require.Equal(t, []byte(`{"jsonrpc":"2.0","result":true,"id":7}`+"\n"), msg)

// test initBlockListener
res, err = wsconn.initBlockListener(1)
require.EqualError(t, err, "error BlockAPI not set")
Expand Down

0 comments on commit 56f594a

Please sign in to comment.