diff --git a/dot/rpc/subscription/messages.go b/dot/rpc/subscription/messages.go index d6df1258dd..1e37f3f265 100644 --- a/dot/rpc/subscription/messages.go +++ b/dot/rpc/subscription/messages.go @@ -28,6 +28,12 @@ type Params struct { SubscriptionID uint `json:"subscription"` } +// InvalidRequestCode error code returned for invalid request parameters, value derived from Substrate node output +const InvalidRequestCode = -32600 + +// InvalidRequestMessage error message for invalid request parameters +const InvalidRequestMessage = "Invalid request" + func newSubcriptionBaseResponseJSON() BaseResponseJSON { return BaseResponseJSON{ Jsonrpc: "2.0", @@ -52,10 +58,26 @@ type ResponseJSON struct { ID float64 `json:"id"` } -func newSubscriptionResponseJSON(subID uint, reqID float64) ResponseJSON { +// NewSubscriptionResponseJSON builds a Response JSON object +func NewSubscriptionResponseJSON(subID uint, reqID float64) ResponseJSON { return ResponseJSON{ Jsonrpc: "2.0", Result: subID, ID: reqID, } } + +// BooleanResponse for responses that return boolean values +type BooleanResponse struct { + JSONRPC string `json:"jsonrpc"` + Result bool `json:"result"` + ID float64 `json:"id"` +} + +func newBooleanResponseJSON(value bool, reqID float64) BooleanResponse { + return BooleanResponse{ + JSONRPC: "2.0", + Result: value, + ID: reqID, + } +} diff --git a/dot/rpc/subscription/websocket.go b/dot/rpc/subscription/websocket.go index 0a2cb1dd7e..911c2aba7c 100644 --- a/dot/rpc/subscription/websocket.go +++ b/dot/rpc/subscription/websocket.go @@ -23,10 +23,12 @@ import ( "io/ioutil" "math/big" "net/http" + "strconv" "strings" "sync" "github.com/ChainSafe/gossamer/dot/rpc/modules" + "github.com/ChainSafe/gossamer/dot/state" "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/common" log "github.com/ChainSafe/log15" @@ -106,6 +108,9 @@ func (c *WSConn) HandleComm() { continue } c.startListener(rvl) + case "state_unsubscribeStorage": + c.unsubscribeStorageListener(reqid, params) + } continue } @@ -206,12 +211,51 @@ func (c *WSConn) initStorageChangeListener(reqID float64, params interface{}) (u c.Subscriptions[myObs.id] = myObs - initRes := newSubscriptionResponseJSON(myObs.id, reqID) + initRes := NewSubscriptionResponseJSON(myObs.id, reqID) c.safeSend(initRes) return myObs.id, nil } +func (c *WSConn) unsubscribeStorageListener(reqID float64, params interface{}) { + switch v := params.(type) { + case []interface{}: + if len(v) == 0 { + c.safeSendError(reqID, big.NewInt(InvalidRequestCode), InvalidRequestMessage) + return + } + default: + c.safeSendError(reqID, big.NewInt(InvalidRequestCode), InvalidRequestMessage) + return + } + + var id uint + switch v := params.([]interface{})[0].(type) { + case float64: + id = uint(v) + case string: + i, err := strconv.ParseUint(v, 10, 32) + if err != nil { + c.safeSend(newBooleanResponseJSON(false, reqID)) + return + } + id = uint(i) + default: + c.safeSendError(reqID, big.NewInt(InvalidRequestCode), InvalidRequestMessage) + return + } + + observer, ok := c.Subscriptions[id].(state.Observer) + if !ok { + initRes := newBooleanResponseJSON(false, reqID) + c.safeSend(initRes) + return + } + + c.StorageAPI.UnregisterStorageObserver(observer) + c.safeSend(newBooleanResponseJSON(true, reqID)) +} + func (c *WSConn) initBlockListener(reqID float64) (uint, error) { bl := &BlockListener{ Channel: make(chan *types.Block), @@ -231,7 +275,7 @@ func (c *WSConn) initBlockListener(reqID float64) (uint, error) { bl.subID = c.qtyListeners c.Subscriptions[bl.subID] = bl c.BlockSubChannels[bl.subID] = chanID - initRes := newSubscriptionResponseJSON(bl.subID, reqID) + initRes := NewSubscriptionResponseJSON(bl.subID, reqID) c.safeSend(initRes) return bl.subID, nil @@ -256,7 +300,7 @@ func (c *WSConn) initBlockFinalizedListener(reqID float64) (uint, error) { bfl.subID = c.qtyListeners c.Subscriptions[bfl.subID] = bfl c.BlockSubChannels[bfl.subID] = chanID - initRes := newSubscriptionResponseJSON(bfl.subID, reqID) + initRes := NewSubscriptionResponseJSON(bfl.subID, reqID) c.safeSend(initRes) return bfl.subID, nil @@ -299,7 +343,7 @@ func (c *WSConn) initExtrinsicWatch(reqID float64, params interface{}) (uint, er if err != nil { return 0, err } - c.safeSend(newSubscriptionResponseJSON(esl.subID, reqID)) + c.safeSend(NewSubscriptionResponseJSON(esl.subID, reqID)) // TODO (ed) since HandleSubmittedExtrinsic has been called we assume the extrinsic is in the tx queue // should we add a channel to tx queue so we're notified when it's in the queue (See issue #1535) @@ -322,7 +366,7 @@ func (c *WSConn) initRuntimeVersionListener(reqID float64) (uint, error) { c.qtyListeners++ rvl.subID = c.qtyListeners c.Subscriptions[rvl.subID] = rvl - initRes := newSubscriptionResponseJSON(rvl.subID, reqID) + initRes := NewSubscriptionResponseJSON(rvl.subID, reqID) c.safeSend(initRes) return rvl.subID, nil diff --git a/dot/rpc/subscription/websocket_test.go b/dot/rpc/subscription/websocket_test.go index 735df61fac..a469e2142e 100644 --- a/dot/rpc/subscription/websocket_test.go +++ b/dot/rpc/subscription/websocket_test.go @@ -113,6 +113,61 @@ func TestWSConn_HandleComm(t *testing.T) { require.NoError(t, err) require.Equal(t, []byte(`{"jsonrpc":"2.0","result":4,"id":7}`+"\n"), msg) + // test state_unsubscribeStorage + c.WriteMessage(websocket.TextMessage, []byte(`{ + "jsonrpc": "2.0", + "method": "state_unsubscribeStorage", + "params": "foo", + "id": 7}`)) + _, msg, err = c.ReadMessage() + require.NoError(t, err) + require.Equal(t, []byte(`{"jsonrpc":"2.0","error":{"code":-32600,"message":"Invalid request"},"id":7}`+"\n"), msg) + + c.WriteMessage(websocket.TextMessage, []byte(`{ + "jsonrpc": "2.0", + "method": "state_unsubscribeStorage", + "params": [], + "id": 7}`)) + _, msg, err = c.ReadMessage() + require.NoError(t, err) + require.Equal(t, []byte(`{"jsonrpc":"2.0","error":{"code":-32600,"message":"Invalid request"},"id":7}`+"\n"), msg) + + c.WriteMessage(websocket.TextMessage, []byte(`{ + "jsonrpc": "2.0", + "method": "state_unsubscribeStorage", + "params": ["6"], + "id": 7}`)) + _, msg, err = c.ReadMessage() + require.NoError(t, err) + require.Equal(t, []byte(`{"jsonrpc":"2.0","result":false,"id":7}`+"\n"), msg) + + c.WriteMessage(websocket.TextMessage, []byte(`{ + "jsonrpc": "2.0", + "method": "state_unsubscribeStorage", + "params": ["4"], + "id": 7}`)) + _, msg, err = c.ReadMessage() + require.NoError(t, err) + require.Equal(t, []byte(`{"jsonrpc":"2.0","result":true,"id":7}`+"\n"), msg) + + c.WriteMessage(websocket.TextMessage, []byte(`{ + "jsonrpc": "2.0", + "method": "state_unsubscribeStorage", + "params": [6], + "id": 7}`)) + _, msg, err = c.ReadMessage() + require.NoError(t, err) + require.Equal(t, []byte(`{"jsonrpc":"2.0","result":false,"id":7}`+"\n"), msg) + + c.WriteMessage(websocket.TextMessage, []byte(`{ + "jsonrpc": "2.0", + "method": "state_unsubscribeStorage", + "params": [4], + "id": 7}`)) + _, msg, err = c.ReadMessage() + require.NoError(t, err) + require.Equal(t, []byte(`{"jsonrpc":"2.0","result":true,"id":7}`+"\n"), msg) + // test initBlockListener res, err = wsconn.initBlockListener(1) require.EqualError(t, err, "error BlockAPI not set")