Skip to content

Commit

Permalink
feature(rpc): handle extrinsic status future and ready for rpc call a…
Browse files Browse the repository at this point in the history
…uthor_submitAndWatchExtrinsic (#1921)

handle extrinsic status future and ready

Through `author_submitAndWatchExtrinsic` rpc subscription call, we will now be able to track when the submitted transaction(extrinsic) gets into future queue (tx pool) and ready queue (tx priority queue).
  • Loading branch information
kishansagathiya authored Nov 9, 2021
1 parent 55d997f commit 1a6b2e4
Show file tree
Hide file tree
Showing 13 changed files with 428 additions and 62 deletions.
2 changes: 2 additions & 0 deletions dot/rpc/modules/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ type TransactionStateAPI interface {
Pop() *transaction.ValidTransaction
Peek() *transaction.ValidTransaction
Pending() []*transaction.ValidTransaction
GetStatusNotifierChannel(ext types.Extrinsic) chan transaction.Status
FreeStatusNotifierChannel(ch chan transaction.Status)
}

//go:generate mockery --name CoreAPI --structname CoreAPI --case underscore --keeptree
Expand Down
10 changes: 10 additions & 0 deletions dot/rpc/modules/api_mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
runtimemocks "github.com/ChainSafe/gossamer/lib/runtime/mocks"
"github.com/ChainSafe/gossamer/lib/transaction"
"github.com/stretchr/testify/mock"
)

Expand Down Expand Up @@ -43,6 +44,15 @@ func NewMockBlockAPI() *modulesmocks.BlockAPI {
return m
}

// NewMockTransactionStateAPI creates and return an rpc TransactionStateAPI interface mock
func NewMockTransactionStateAPI() *modulesmocks.TransactionStateAPI {
m := new(modulesmocks.TransactionStateAPI)
m.On("FreeStatusNotifierChannel", mock.AnythingOfType("chan transaction.Status"))
m.On("GetStatusNotifierChannel", mock.AnythingOfType("types.Extrinsic")).Return(make(chan transaction.Status))
m.On("AddToPool", mock.AnythingOfType("transaction.ValidTransaction")).Return(common.Hash{})
return m
}

// NewMockCoreAPI creates and return an rpc CoreAPI interface mock
func NewMockCoreAPI() *modulesmocks.CoreAPI {
m := new(modulesmocks.CoreAPI)
Expand Down
23 changes: 23 additions & 0 deletions dot/rpc/modules/mocks/transaction_state_api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 26 additions & 11 deletions dot/rpc/subscription/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/runtime"
"github.com/ChainSafe/gossamer/lib/transaction"
)

const (
Expand Down Expand Up @@ -57,13 +58,6 @@ type WSConnAPI interface {
safeSend(interface{})
}

// StorageObserver struct to hold data for observer (Observer Design Pattern)
type StorageObserver struct {
id uint32
filter map[string][]byte
wsconn *WSConn
}

// Change type defining key value pair representing change
type Change [2]string

Expand All @@ -73,6 +67,13 @@ type ChangeResult struct {
Block string `json:"block"`
}

// StorageObserver struct to hold data for observer (Observer Design Pattern)
type StorageObserver struct {
id uint32
filter map[string][]byte
wsconn *WSConn
}

// Update is called to notify observer of new value
func (s *StorageObserver) Update(change *state.SubscriptionResult) {
if change == nil {
Expand Down Expand Up @@ -309,22 +310,28 @@ type ExtrinsicSubmitListener struct {
importedChan chan *types.Block
importedHash common.Hash
finalisedChan chan *types.FinalisationInfo
// txStatusChan is used to know when transaction/extrinsic becomes part of the
// ready queue or future queue.
// we are using transaction.PriorityQueue for ready queue and transaction.Pool
// for future queue.
txStatusChan chan transaction.Status
done chan struct{}
cancel chan struct{}
cancelTimeout time.Duration
}

// NewExtrinsicSubmitListener constructor to build new ExtrinsicSubmitListener
func NewExtrinsicSubmitListener(conn *WSConn, extBytes []byte) *ExtrinsicSubmitListener {
esl := &ExtrinsicSubmitListener{
func NewExtrinsicSubmitListener(conn *WSConn, extBytes []byte, importedChan chan *types.Block, txStatusChan chan transaction.Status, finalisedChan chan *types.FinalisationInfo) *ExtrinsicSubmitListener {
return &ExtrinsicSubmitListener{
wsconn: conn,
extrinsic: types.Extrinsic(extBytes),
finalisedChan: make(chan *types.FinalisationInfo),
importedChan: importedChan,
txStatusChan: txStatusChan,
finalisedChan: finalisedChan,
cancel: make(chan struct{}, 1),
done: make(chan struct{}, 1),
cancelTimeout: defaultCancelTimeout,
}
return esl
}

// Listen implementation of Listen interface to listen for importedChan changes
Expand All @@ -334,8 +341,10 @@ func (l *ExtrinsicSubmitListener) Listen() {
defer func() {
l.wsconn.BlockAPI.FreeImportedBlockNotifierChannel(l.importedChan)
l.wsconn.BlockAPI.FreeFinalisedNotifierChannel(l.finalisedChan)
l.wsconn.TxStateAPI.FreeStatusNotifierChannel(l.txStatusChan)
close(l.done)
close(l.finalisedChan)
close(l.txStatusChan)
}()

for {
Expand Down Expand Up @@ -373,6 +382,12 @@ func (l *ExtrinsicSubmitListener) Listen() {
resM["finalised"] = info.Header.Hash().String()
l.wsconn.safeSend(newSubscriptionResponse(authorExtrinsicUpdatesMethod, l.subID, resM))
}
case txStatus, ok := <-l.txStatusChan:
if !ok {
return
}

l.wsconn.safeSend(newSubscriptionResponse(authorExtrinsicUpdatesMethod, l.subID, txStatus.String()))
}
}
}()
Expand Down
6 changes: 6 additions & 0 deletions dot/rpc/subscription/listeners_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/ChainSafe/gossamer/lib/grandpa"
"github.com/ChainSafe/gossamer/lib/runtime"
"github.com/ChainSafe/gossamer/lib/runtime/wasmer"
"github.com/ChainSafe/gossamer/lib/transaction"
"github.com/ChainSafe/gossamer/pkg/scale"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -194,16 +195,21 @@ func TestExtrinsicSubmitListener_Listen(t *testing.T) {

notifyImportedChan := make(chan *types.Block, 100)
notifyFinalizedChan := make(chan *types.FinalisationInfo, 100)
txStatusChan := make(chan transaction.Status)

BlockAPI := new(mocks.BlockAPI)
BlockAPI.On("FreeImportedBlockNotifierChannel", mock.AnythingOfType("chan *types.Block"))
BlockAPI.On("FreeFinalisedNotifierChannel", mock.AnythingOfType("chan *types.FinalisationInfo"))

wsconn.BlockAPI = BlockAPI

TxStateAPI := modules.NewMockTransactionStateAPI()
wsconn.TxStateAPI = TxStateAPI

esl := ExtrinsicSubmitListener{
importedChan: notifyImportedChan,
finalisedChan: notifyFinalizedChan,
txStatusChan: txStatusChan,
wsconn: wsconn,
extrinsic: types.Extrinsic{1, 2, 3},
cancel: make(chan struct{}),
Expand Down
3 changes: 2 additions & 1 deletion dot/rpc/subscription/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import (
"strconv"
)

// RPC methods
const (
authorSubmitAndWatchExtrinsic string = "author_submitAndWatchExtrinsic" //nolint
authorSubmitAndWatchExtrinsic string = "author_submitAndWatchExtrinsic"
chainSubscribeNewHeads string = "chain_subscribeNewHeads"
chainSubscribeNewHead string = "chain_subscribeNewHead"
chainSubscribeFinalizedHeads string = "chain_subscribeFinalizedHeads"
Expand Down
63 changes: 36 additions & 27 deletions dot/rpc/subscription/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"sync/atomic"

"github.com/ChainSafe/gossamer/dot/rpc/modules"

"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/runtime"
log "github.com/ChainSafe/log15"
Expand Down Expand Up @@ -58,8 +59,7 @@ type WSConn struct {
CoreAPI modules.CoreAPI
TxStateAPI modules.TransactionStateAPI
RPCHost string

HTTP httpclient
HTTP httpclient
}

// readWebsocketMessage will read and parse the message data to a string->interface{} data
Expand All @@ -70,7 +70,7 @@ func (c *WSConn) readWebsocketMessage() ([]byte, map[string]interface{}, error)
return nil, nil, errCannotReadFromWebsocket
}

logger.Trace("websocket received", "message", mbytes)
logger.Trace("websocket received", "message", string(mbytes))

// determine if request is for subscribe method type
var msg map[string]interface{}
Expand Down Expand Up @@ -104,14 +104,14 @@ func (c *WSConn) HandleComm() {
logger.Debug("ws method called", "method", method, "params", params)

if !strings.Contains(method, "_unsubscribe") && !strings.Contains(method, "_unwatch") {
setup := c.getSetupListener(method)
setupListener := c.getSetupListener(method)

if setup == nil {
if setupListener == nil {
c.executeRPCCall(mbytes)
continue
}

listener, err := setup(reqid, params) //nolint
listener, err := setupListener(reqid, params) //nolint
if err != nil {
logger.Warn("failed to create listener", "method", method, "error", err)
continue
Expand Down Expand Up @@ -234,7 +234,7 @@ func (c *WSConn) initBlockListener(reqID float64, _ interface{}) (Listener, erro
}

func (c *WSConn) initBlockFinalizedListener(reqID float64, _ interface{}) (Listener, error) {
bfl := &BlockFinalizedListener{
blockFinalizedListener := &BlockFinalizedListener{
cancel: make(chan struct{}, 1),
done: make(chan struct{}, 1),
cancelTimeout: defaultCancelTimeout,
Expand All @@ -246,19 +246,19 @@ func (c *WSConn) initBlockFinalizedListener(reqID float64, _ interface{}) (Liste
return nil, fmt.Errorf("error BlockAPI not set")
}

bfl.channel = c.BlockAPI.GetFinalisedNotifierChannel()
blockFinalizedListener.channel = c.BlockAPI.GetFinalisedNotifierChannel()

c.mu.Lock()

bfl.subID = atomic.AddUint32(&c.qtyListeners, 1)
c.Subscriptions[bfl.subID] = bfl
blockFinalizedListener.subID = atomic.AddUint32(&c.qtyListeners, 1)
c.Subscriptions[blockFinalizedListener.subID] = blockFinalizedListener

c.mu.Unlock()

initRes := NewSubscriptionResponseJSON(bfl.subID, reqID)
initRes := NewSubscriptionResponseJSON(blockFinalizedListener.subID, reqID)
c.safeSend(initRes)

return bfl, nil
return blockFinalizedListener, nil
}

func (c *WSConn) initAllBlocksListerner(reqID float64, _ interface{}) (Listener, error) {
Expand All @@ -283,42 +283,51 @@ func (c *WSConn) initAllBlocksListerner(reqID float64, _ interface{}) (Listener,

func (c *WSConn) initExtrinsicWatch(reqID float64, params interface{}) (Listener, error) {
pA := params.([]interface{})

if len(pA) != 1 {
return nil, errors.New("expecting only one parameter")
}

// The passed parameter should be a HEX of a SCALE encoded extrinsic
extBytes, err := common.HexToBytes(pA[0].(string))
if err != nil {
return nil, err
}

// listen for built blocks
esl := NewExtrinsicSubmitListener(c, extBytes)

if c.BlockAPI == nil {
return nil, fmt.Errorf("error BlockAPI not set")
}

esl.importedChan = c.BlockAPI.GetImportedBlockNotifierChannel()
txStatusChan := c.TxStateAPI.GetStatusNotifierChannel(extBytes)
importedChan := c.BlockAPI.GetImportedBlockNotifierChannel()
finalizedChan := c.BlockAPI.GetFinalisedNotifierChannel()

esl.finalisedChan = c.BlockAPI.GetFinalisedNotifierChannel()
extSubmitListener := NewExtrinsicSubmitListener(
c,
extBytes,
importedChan,
txStatusChan,
finalizedChan,
)

c.mu.Lock()

esl.subID = atomic.AddUint32(&c.qtyListeners, 1)
c.Subscriptions[esl.subID] = esl

extSubmitListener.subID = atomic.AddUint32(&c.qtyListeners, 1)
c.Subscriptions[extSubmitListener.subID] = extSubmitListener
c.mu.Unlock()

err = c.CoreAPI.HandleSubmittedExtrinsic(extBytes)
if err != nil {
if errors.Is(err, runtime.ErrInvalidTransaction) || errors.Is(err, runtime.ErrUnknownTransaction) {
c.safeSend(newSubscriptionResponse(authorExtrinsicUpdatesMethod, extSubmitListener.subID, "invalid"))
return nil, err
} else if err != nil {
c.safeSendError(reqID, nil, err.Error())
return nil, err
}
c.safeSend(NewSubscriptionResponseJSON(esl.subID, reqID))

// TODO (ed) since HandleSubmittedExtrinsic has been called we assume the extrinsic is in the tx queue
// should we add a channel to tx queue so we're notified when it's in the queue (#1535)
c.safeSend(newSubscriptionResponse(authorExtrinsicUpdatesMethod, esl.subID, "ready"))
c.safeSend(NewSubscriptionResponseJSON(extSubmitListener.subID, reqID))

// todo (ed) determine which peer extrinsic has been broadcast to, and set status (#1535)
return esl, err
return extSubmitListener, err
}

func (c *WSConn) initRuntimeVersionListener(reqID float64, _ interface{}) (Listener, error) {
Expand Down
Loading

0 comments on commit 1a6b2e4

Please sign in to comment.