Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(rpc/subscription): subscribe runtime version notify when version changes #1686

Merged
merged 56 commits into from
Aug 13, 2021
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
7325277
add code for runtime version changed fix
edwardmack Jul 2, 2021
2d324de
Merge branch 'development' into ed/fix_state_subscribeRuntimeVersion
edwardmack Jul 7, 2021
9394d38
fix return type
edwardmack Jul 7, 2021
cfe9117
update mock in Makefile
edwardmack Jul 7, 2021
37c8878
fix core_api mock
EclesioMeloJunior Jul 8, 2021
fb75b48
Merge branch 'development' into ed/fix_state_subscribeRuntimeVersion
edwardmack Jul 8, 2021
feb5f4d
implement notifications for runtime updates
edwardmack Jul 8, 2021
a31474d
lint
edwardmack Jul 8, 2021
3017ed3
Merge branch 'development' into ed/fix_state_subscribeRuntimeVersion
edwardmack Jul 9, 2021
fd83528
Merge branch 'development' into ed/fix_state_subscribeRuntimeVersion
edwardmack Jul 9, 2021
432d604
Merge branch 'development' into ed/fix_state_subscribeRuntimeVersion
edwardmack Jul 9, 2021
0dcc08f
Merge branch 'development' into ed/fix_state_subscribeRuntimeVersion
edwardmack Jul 12, 2021
d7c5d59
Merge branch 'development' into ed/fix_state_subscribeRuntimeVersion
edwardmack Jul 14, 2021
8c16176
implement unregister runtime version listener
edwardmack Jul 14, 2021
de0c40b
make mocks and lint
edwardmack Jul 15, 2021
5512348
refactor runtime subscription name, change error handeling
edwardmack Jul 15, 2021
09c9b2e
remove unneeded select case
edwardmack Jul 15, 2021
cf0e477
lint, refactor GetID to GetChannelID
edwardmack Jul 15, 2021
6c3ea54
lint
edwardmack Jul 15, 2021
ade2a82
Merge branch 'development' into ed/fix_state_subscribeRuntimeVersion
edwardmack Jul 15, 2021
c0eb904
Merge branch 'development' into ed/fix_state_subscribeRuntimeVersion
edwardmack Jul 20, 2021
8031b82
add sync.WaitGroup to notify Runtime Updated
edwardmack Jul 21, 2021
d6f6cea
add test for Register UnRegister Runtime Update Channel
edwardmack Jul 21, 2021
2f6c7e2
add check for id
edwardmack Jul 21, 2021
98fb9d9
add register panic test
edwardmack Jul 21, 2021
42f436e
add test to produce panic
edwardmack Jul 21, 2021
125a5d1
Merge branch 'development' into ed/fix_state_subscribeRuntimeVersion
edwardmack Jul 21, 2021
0e2fcd7
Merge branch 'development' into ed/fix_state_subscribeRuntimeVersion
edwardmack Jul 22, 2021
7e7e9bd
generate id as uuid, add locks, delete from map
edwardmack Jul 23, 2021
66bd9e2
Merge branch 'development' into ed/fix_state_subscribeRuntimeVersion
edwardmack Jul 23, 2021
ff35489
update runtime updated channel ids to use uint32
edwardmack Jul 23, 2021
70d6c27
Merge branch 'development' into ed/fix_state_subscribeRuntimeVersion
edwardmack Jul 23, 2021
cecc811
Merge branch 'development' into ed/fix_state_subscribeRuntimeVersion
edwardmack Jul 27, 2021
c8d8d92
change logging to debug
edwardmack Jul 27, 2021
4747f96
remove comment
edwardmack Jul 27, 2021
c0cbdc0
move uuid into check loop
edwardmack Jul 27, 2021
b70b82c
update runtime test
edwardmack Jul 27, 2021
fcac2b5
add test for notify runtime updated
edwardmack Jul 27, 2021
a41d0af
update runtime tests
edwardmack Jul 27, 2021
568ed88
move runtime notify from coreAPI to blockStateAPI
edwardmack Jul 27, 2021
4fd52d8
fix mocks for tests
edwardmack Jul 27, 2021
5e7fba5
refactor state_test
edwardmack Jul 28, 2021
6038b6c
add tests
edwardmack Jul 29, 2021
ea37d63
lint
edwardmack Jul 29, 2021
dc78e71
Merge branch 'development' into ed/fix_state_subscribeRuntimeVersion
edwardmack Jul 29, 2021
0de3c60
Merge branch 'development' into ed/fix_state_subscribeRuntimeVersion
edwardmack Aug 4, 2021
739f8aa
remove commented code, fix var assignment
edwardmack Aug 4, 2021
e3e5cef
Merge branch 'development' into ed/fix_state_subscribeRuntimeVersion
edwardmack Aug 4, 2021
39f4a88
Merge branch 'development' into ed/fix_state_subscribeRuntimeVersion
edwardmack Aug 5, 2021
cb364a9
fix merge conflict
edwardmack Aug 5, 2021
84f89d3
add check if channel is ok
edwardmack Aug 5, 2021
1a877ac
Merge branch 'development' into ed/fix_state_subscribeRuntimeVersion
edwardmack Aug 6, 2021
6270ede
update unit test
edwardmack Aug 6, 2021
9be6179
Merge branch 'development' into ed/fix_state_subscribeRuntimeVersion
edwardmack Aug 9, 2021
a22fca9
send notification as go routine
edwardmack Aug 9, 2021
154061f
Merge branch 'development' into ed/fix_state_subscribeRuntimeVersion
edwardmack Aug 13, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions dot/core/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package core
import (
"bytes"
"context"
"errors"
"fmt"
"math/big"
"math/rand"
"os"
"sync"

Expand Down Expand Up @@ -69,6 +71,9 @@ type Service struct {

// Keystore
keys *keystore.GlobalKeystore

runtimeChangedLock sync.RWMutex
runtimeChanged map[byte]chan<- runtime.Version
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename runtimeUpdateSubscriptions or some thing like that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated to runtimeUpdateSubscriptions and runtimeUpdateSubscriptionsLock

}

// Config holds the configuration for the core Service.
Expand Down Expand Up @@ -151,6 +156,7 @@ func NewService(cfg *Config) (*Service, error) {
codeSubstitute: cfg.CodeSubstitutes,
codeSubstitutedState: cfg.CodeSubstitutedState,
digestHandler: cfg.DigestHandler,
runtimeChanged: make(map[byte]chan<- runtime.Version),
}

return srv, nil
Expand Down Expand Up @@ -308,6 +314,11 @@ func (s *Service) handleRuntimeChanges(newState *rtstorage.TrieState) error {
return err
}

ver, err := s.rt.Version()
if err == nil {
go s.notifyRuntimeUpdated(ver)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this errors that is bad, so should handle that error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated to return err here.


s.codeHash = currCodeHash

err = s.codeSubstitutedState.StoreCodeSubstitutedBlockHash(common.Hash{})
Expand Down Expand Up @@ -462,6 +473,25 @@ func (s *Service) handleChainReorg(prev, curr common.Hash) error {
return nil
}

func (s *Service) notifyRuntimeUpdated(version runtime.Version) {
s.runtimeChangedLock.RLock()
defer s.runtimeChangedLock.RUnlock()

if len(s.runtimeChanged) == 0 {
return
}

logger.Info("notifying runtime updated chans...", "chans", s.runtimeChanged)
for _, ch := range s.runtimeChanged {
go func(ch chan<- runtime.Version) {
select {
case ch <- version:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the channel being written to as a select case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, no reason, so I've removed the select case.

default:
}
}(ch)
}
}

// maintainTransactionPool removes any transactions that were included in the new block, revalidates the transactions in the pool,
// and moves them to the queue if valid.
// See https://github.com/paritytech/substrate/blob/74804b5649eccfb83c90aec87bdca58e5d5c8789/client/transaction-pool/src/lib.rs#L545
Expand Down Expand Up @@ -579,3 +609,40 @@ func (s *Service) GetMetadata(bhash *common.Hash) ([]byte, error) {
s.rt.SetContextStorage(ts)
return s.rt.Metadata()
}

// RegisterRuntimeUpdatedChannel function to register chan that is notified when runtime version changes
func (s *Service) RegisterRuntimeUpdatedChannel(ch chan<- runtime.Version) (byte, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice if we have an Unregister method that closes the channel and remove it from the map

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added Unregister method.

s.runtimeChangedLock.Lock()
defer s.runtimeChangedLock.Unlock()

if len(s.runtimeChanged) == 256 {
return 0, errors.New("channel limit reached")
noot marked this conversation as resolved.
Show resolved Hide resolved
}

var id byte
for {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generateID should probably be a receiver function on the Service itself and you can check if there are collisions in the map so you don't have to loop here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that seems better.

id = generateID()
if s.runtimeChanged[id] == nil {
break
}
}

s.runtimeChanged[id] = ch
return id, nil
}

func (s *Service) UnregisterRuntimeUpdatedChannel (id byte) bool {
ch := s.runtimeChanged[id]
if ch != nil {
close(ch)
s.runtimeChanged[id] = nil
return true
}
return false
}

func generateID() byte {
// skipcq: GSC-G404
id := rand.Intn(256) //nolint
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe use a uuid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to use uuid, do I still need to do the loop check since this is a uuid?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea you should still check even though it's extremely unlikely.

return byte(id)
}
2 changes: 2 additions & 0 deletions dot/rpc/modules/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type CoreAPI interface {
GetRuntimeVersion(bhash *common.Hash) (runtime.Version, error)
HandleSubmittedExtrinsic(types.Extrinsic) error
GetMetadata(bhash *common.Hash) ([]byte, error)
RegisterRuntimeUpdatedChannel(ch chan<- runtime.Version) (byte, error)
UnregisterRuntimeUpdatedChannel(id byte) bool
}

// RPCAPI is the interface for methods related to RPC service
Expand Down
17 changes: 16 additions & 1 deletion dot/rpc/modules/api_mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package modules
import (
modulesmocks "github.com/ChainSafe/gossamer/dot/rpc/modules/mocks"
"github.com/ChainSafe/gossamer/lib/common"
runtimemocks "github.com/ChainSafe/gossamer/lib/runtime/mocks"
"github.com/stretchr/testify/mock"
)

Expand Down Expand Up @@ -42,9 +43,23 @@ func NewMockCoreAPI() *modulesmocks.MockCoreAPI {
m := new(modulesmocks.MockCoreAPI)
m.On("InsertKey", mock.AnythingOfType("crypto.Keypair"))
m.On("HasKey", mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(false, nil)
m.On("GetRuntimeVersion", mock.AnythingOfType("*common.Hash")).Return(nil, nil)
m.On("GetRuntimeVersion", mock.AnythingOfType("*common.Hash")).Return(NewMockVersion(), nil)
m.On("IsBlockProducer").Return(false)
m.On("HandleSubmittedExtrinsic", mock.AnythingOfType("types.Extrinsic")).Return(nil)
m.On("GetMetadata", mock.AnythingOfType("*common.Hash")).Return(nil, nil)
m.On("RegisterRuntimeUpdatedChannel", mock.AnythingOfType("chan<- runtime.Version")).Return(byte(0), nil)
return m
}

// NewMockVersion creates and returns an runtime Version interface mock
func NewMockVersion() *runtimemocks.MockVersion {
m := new(runtimemocks.MockVersion)
m.On("SpecName").Return([]byte(`mock-spec`))
m.On("ImplName").Return(nil)
m.On("AuthoringVersion").Return(uint32(0))
m.On("SpecVersion").Return(uint32(0))
m.On("ImplVersion").Return(uint32(0))
m.On("TransactionVersion").Return(uint32(0))
m.On("APIItems").Return(nil)
return m
}
23 changes: 15 additions & 8 deletions dot/rpc/modules/mocks/core_api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 34 additions & 7 deletions dot/rpc/subscription/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/ChainSafe/gossamer/dot/state"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/runtime"
)

// Listener interface for functions that define Listener related functions
Expand Down Expand Up @@ -258,20 +259,25 @@ func (l *ExtrinsicSubmitListener) Stop() { l.cancel() }

// RuntimeVersionListener to handle listening for Runtime Version
type RuntimeVersionListener struct {
wsconn *WSConn
subID uint
wsconn WSConnAPI
subID uint
runtimeUpdate chan runtime.Version
channelID byte
coreAPI modules.CoreAPI
}

type VersionListener interface {
GetID() byte
}

// Listen implementation of Listen interface to listen for runtime version changes
func (l *RuntimeVersionListener) Listen() {
// This sends current runtime version once when subscription is created
// TODO (ed) add logic to send updates when runtime version changes
rtVersion, err := l.wsconn.CoreAPI.GetRuntimeVersion(nil)
rtVersion, err := l.coreAPI.GetRuntimeVersion(nil)
if err != nil {
return
}
ver := modules.StateRuntimeVersionResponse{}

ver.SpecName = string(rtVersion.SpecName())
ver.ImplName = string(rtVersion.ImplName())
ver.AuthoringVersion = rtVersion.AuthoringVersion()
Expand All @@ -280,9 +286,30 @@ func (l *RuntimeVersionListener) Listen() {
ver.TransactionVersion = rtVersion.TransactionVersion()
ver.Apis = modules.ConvertAPIs(rtVersion.APIItems())

l.wsconn.safeSend(newSubscriptionResponse("state_runtimeVersion", l.subID, ver))
go l.wsconn.safeSend(newSubscriptionResponse("state_runtimeVersion", l.subID, ver))

// listen for runtime updates
go func() {
for info := range l.runtimeUpdate {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for info := range l.runtimeUpdate {
for info, ok := range l.runtimeUpdate {
if !ok {
return
}
// rest of code

add closed channel check

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I try to add ok assignment as above, I get compile error too many variables in range, is there another way to check this?

Copy link
Contributor

@noot noot Jul 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh nvm I missed that this is a range, you'd have to change it to read from the channel in a for loop

for {
    info, ok := <-l.runtimeUpdate {
    if !ok {
        return
    }
    //rest of code
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just pinging on this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

ver := modules.StateRuntimeVersionResponse{}

ver.SpecName = string(info.SpecName())
ver.ImplName = string(info.ImplName())
ver.AuthoringVersion = info.AuthoringVersion()
ver.SpecVersion = info.SpecVersion()
ver.ImplVersion = info.ImplVersion()
ver.TransactionVersion = info.TransactionVersion()
ver.Apis = modules.ConvertAPIs(info.APIItems())

l.wsconn.safeSend(newSubscriptionResponse("state_runtimeVersion", l.subID, ver))
}
}()
}

// Stop to runtimeVersionListener not implemented yet because the listener
// does not need to be stoped
// does not need to be stopped
func (l *RuntimeVersionListener) Stop() {}

func (l *RuntimeVersionListener) GetID() byte {
return l.channelID
}
59 changes: 59 additions & 0 deletions dot/rpc/subscription/listeners_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@
package subscription

import (
"io/ioutil"
"math/big"
"path/filepath"
"testing"
"time"

"github.com/ChainSafe/gossamer/dot/rpc/modules"
"github.com/ChainSafe/gossamer/dot/state"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/runtime"
"github.com/ChainSafe/gossamer/lib/runtime/wasmer"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -159,3 +163,58 @@ func TestExtrinsicSubmitListener_Listen(t *testing.T) {
expectedFinalizedRespones := newSubscriptionResponse(AuthorExtrinsicUpdates, esl.subID, resFinalised)
require.Equal(t, expectedFinalizedRespones, mockConnection.lastMessage)
}

func TestRuntimeChannelListener_Listen(t *testing.T) {
notifyChan := make(chan runtime.Version)
mockConnection := &MockWSConnAPI{}
rvl := RuntimeVersionListener{
wsconn: mockConnection,
subID: 0,
runtimeUpdate: notifyChan,
coreAPI: modules.NewMockCoreAPI(),
}

expectedInitialVersion := modules.StateRuntimeVersionResponse{
SpecName: "mock-spec",
Apis: modules.ConvertAPIs(nil),
}

expectedInitialResponse := newSubcriptionBaseResponseJSON()
expectedInitialResponse.Method = "state_runtimeVersion"
expectedInitialResponse.Params.Result = expectedInitialVersion

instance := wasmer.NewTestInstance(t, runtime.NODE_RUNTIME)
_, err := runtime.GetRuntimeBlob(runtime.POLKADOT_RUNTIME_FP, runtime.POLKADOT_RUNTIME_URL)
require.NoError(t, err)
fp, err := filepath.Abs(runtime.POLKADOT_RUNTIME_FP)
require.NoError(t, err)
code, err := ioutil.ReadFile(fp)
require.NoError(t, err)
version, err := instance.CheckRuntimeVersion(code)
require.NoError(t, err)

expectedUpdatedVersion := modules.StateRuntimeVersionResponse{
SpecName: "polkadot",
ImplName: "parity-polkadot",
AuthoringVersion: 0,
SpecVersion: 25,
ImplVersion: 0,
TransactionVersion: 5,
Apis: modules.ConvertAPIs(version.APIItems()),
}

expectedUpdateResponse := newSubcriptionBaseResponseJSON()
expectedUpdateResponse.Method = "state_runtimeVersion"
expectedUpdateResponse.Params.Result = expectedUpdatedVersion

go rvl.Listen()

//check initial response
time.Sleep(time.Millisecond * 10)
require.Equal(t, expectedInitialResponse, mockConnection.lastMessage)

// check response after update
notifyChan <- version
time.Sleep(time.Millisecond * 10)
require.Equal(t, expectedUpdateResponse, mockConnection.lastMessage)
}
30 changes: 26 additions & 4 deletions dot/rpc/subscription/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/ChainSafe/gossamer/dot/state"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/runtime"
log "github.com/ChainSafe/log15"
"github.com/gorilla/websocket"
)
Expand Down Expand Up @@ -345,17 +346,25 @@ func (c *WSConn) initExtrinsicWatch(reqID float64, params interface{}) (Listener
}

func (c *WSConn) initRuntimeVersionListener(reqID float64, _ interface{}) (Listener, error) {
rvl := &RuntimeVersionListener{
wsconn: c,
}

if c.CoreAPI == nil {
c.safeSendError(reqID, nil, "error CoreAPI not set")
return nil, fmt.Errorf("error CoreAPI not set")
}

rvl := &RuntimeVersionListener{
wsconn: c,
runtimeUpdate: make(chan runtime.Version),
coreAPI: c.CoreAPI,
}

chanID, err := c.CoreAPI.RegisterRuntimeUpdatedChannel(rvl.runtimeUpdate)
if err != nil {
return nil, err
}

c.mu.Lock()

rvl.channelID = chanID
c.qtyListeners++
rvl.subID = c.qtyListeners
c.Subscriptions[rvl.subID] = rvl
Expand All @@ -367,6 +376,19 @@ func (c *WSConn) initRuntimeVersionListener(reqID float64, _ interface{}) (Liste
return rvl, nil
}

func (c *WSConn) unsubscribeRuntimeVersionListener(reqID float64, l Listener, _ interface{}) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to register this unsubscribe function to a webscoket method here ./dot/rpc/subscription/subscription.go

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I've added it.

observer, ok := l.(VersionListener)
if !ok {
initRes := newBooleanResponseJSON(false, reqID)
c.safeSend(initRes)
return
}
id := observer.GetID()

res := c.CoreAPI.UnregisterRuntimeUpdatedChannel(id)
c.safeSend(newBooleanResponseJSON(res, reqID))
}

func (c *WSConn) safeSend(msg interface{}) {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down
Loading