diff --git a/dot/rpc/modules/api.go b/dot/rpc/modules/api.go index 41556bd1d3..e29c7caece 100644 --- a/dot/rpc/modules/api.go +++ b/dot/rpc/modules/api.go @@ -86,6 +86,8 @@ type TransactionStateAPI interface { Pop() *transaction.ValidTransaction Peek() *transaction.ValidTransaction Pending() []*transaction.ValidTransaction + GetStatusNotifierChannel(ext types.Extrinsic) chan transaction.Status + FreeStatusNotifierChannel(ch chan transaction.Status) } //go:generate mockery --name CoreAPI --structname CoreAPI --case underscore --keeptree diff --git a/dot/rpc/modules/api_mocks.go b/dot/rpc/modules/api_mocks.go index 1aac332be6..43d302c365 100644 --- a/dot/rpc/modules/api_mocks.go +++ b/dot/rpc/modules/api_mocks.go @@ -5,6 +5,7 @@ import ( "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/common" runtimemocks "github.com/ChainSafe/gossamer/lib/runtime/mocks" + "github.com/ChainSafe/gossamer/lib/transaction" "github.com/stretchr/testify/mock" ) @@ -43,6 +44,15 @@ func NewMockBlockAPI() *modulesmocks.BlockAPI { return m } +// NewMockTransactionStateAPI creates and return an rpc TransactionStateAPI interface mock +func NewMockTransactionStateAPI() *modulesmocks.TransactionStateAPI { + m := new(modulesmocks.TransactionStateAPI) + m.On("FreeStatusNotifierChannel", mock.AnythingOfType("chan transaction.Status")) + m.On("GetStatusNotifierChannel", mock.AnythingOfType("types.Extrinsic")).Return(make(chan transaction.Status)) + m.On("AddToPool", mock.AnythingOfType("transaction.ValidTransaction")).Return(common.Hash{}) + return m +} + // NewMockCoreAPI creates and return an rpc CoreAPI interface mock func NewMockCoreAPI() *modulesmocks.CoreAPI { m := new(modulesmocks.CoreAPI) diff --git a/dot/rpc/modules/mocks/transaction_state_api.go b/dot/rpc/modules/mocks/transaction_state_api.go index 366941acd1..e07c4994ae 100644 --- a/dot/rpc/modules/mocks/transaction_state_api.go +++ b/dot/rpc/modules/mocks/transaction_state_api.go @@ -7,6 +7,8 @@ import ( mock "github.com/stretchr/testify/mock" transaction "github.com/ChainSafe/gossamer/lib/transaction" + + types "github.com/ChainSafe/gossamer/dot/types" ) // TransactionStateAPI is an autogenerated mock type for the TransactionStateAPI type @@ -30,6 +32,27 @@ func (_m *TransactionStateAPI) AddToPool(_a0 *transaction.ValidTransaction) comm return r0 } +// FreeStatusNotifierChannel provides a mock function with given fields: ch +func (_m *TransactionStateAPI) FreeStatusNotifierChannel(ch chan transaction.Status) { + _m.Called(ch) +} + +// GetStatusNotifierChannel provides a mock function with given fields: ext +func (_m *TransactionStateAPI) GetStatusNotifierChannel(ext types.Extrinsic) chan transaction.Status { + ret := _m.Called(ext) + + var r0 chan transaction.Status + if rf, ok := ret.Get(0).(func(types.Extrinsic) chan transaction.Status); ok { + r0 = rf(ext) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(chan transaction.Status) + } + } + + return r0 +} + // Peek provides a mock function with given fields: func (_m *TransactionStateAPI) Peek() *transaction.ValidTransaction { ret := _m.Called() diff --git a/dot/rpc/subscription/listeners.go b/dot/rpc/subscription/listeners.go index 32cab72054..66187212b3 100644 --- a/dot/rpc/subscription/listeners.go +++ b/dot/rpc/subscription/listeners.go @@ -27,6 +27,7 @@ import ( "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/runtime" + "github.com/ChainSafe/gossamer/lib/transaction" ) const ( @@ -57,13 +58,6 @@ type WSConnAPI interface { safeSend(interface{}) } -// StorageObserver struct to hold data for observer (Observer Design Pattern) -type StorageObserver struct { - id uint32 - filter map[string][]byte - wsconn *WSConn -} - // Change type defining key value pair representing change type Change [2]string @@ -73,6 +67,13 @@ type ChangeResult struct { Block string `json:"block"` } +// StorageObserver struct to hold data for observer (Observer Design Pattern) +type StorageObserver struct { + id uint32 + filter map[string][]byte + wsconn *WSConn +} + // Update is called to notify observer of new value func (s *StorageObserver) Update(change *state.SubscriptionResult) { if change == nil { @@ -309,22 +310,28 @@ type ExtrinsicSubmitListener struct { importedChan chan *types.Block importedHash common.Hash finalisedChan chan *types.FinalisationInfo + // txStatusChan is used to know when transaction/extrinsic becomes part of the + // ready queue or future queue. + // we are using transaction.PriorityQueue for ready queue and transaction.Pool + // for future queue. + txStatusChan chan transaction.Status done chan struct{} cancel chan struct{} cancelTimeout time.Duration } // NewExtrinsicSubmitListener constructor to build new ExtrinsicSubmitListener -func NewExtrinsicSubmitListener(conn *WSConn, extBytes []byte) *ExtrinsicSubmitListener { - esl := &ExtrinsicSubmitListener{ +func NewExtrinsicSubmitListener(conn *WSConn, extBytes []byte, importedChan chan *types.Block, txStatusChan chan transaction.Status, finalisedChan chan *types.FinalisationInfo) *ExtrinsicSubmitListener { + return &ExtrinsicSubmitListener{ wsconn: conn, extrinsic: types.Extrinsic(extBytes), - finalisedChan: make(chan *types.FinalisationInfo), + importedChan: importedChan, + txStatusChan: txStatusChan, + finalisedChan: finalisedChan, cancel: make(chan struct{}, 1), done: make(chan struct{}, 1), cancelTimeout: defaultCancelTimeout, } - return esl } // Listen implementation of Listen interface to listen for importedChan changes @@ -334,8 +341,10 @@ func (l *ExtrinsicSubmitListener) Listen() { defer func() { l.wsconn.BlockAPI.FreeImportedBlockNotifierChannel(l.importedChan) l.wsconn.BlockAPI.FreeFinalisedNotifierChannel(l.finalisedChan) + l.wsconn.TxStateAPI.FreeStatusNotifierChannel(l.txStatusChan) close(l.done) close(l.finalisedChan) + close(l.txStatusChan) }() for { @@ -373,6 +382,12 @@ func (l *ExtrinsicSubmitListener) Listen() { resM["finalised"] = info.Header.Hash().String() l.wsconn.safeSend(newSubscriptionResponse(authorExtrinsicUpdatesMethod, l.subID, resM)) } + case txStatus, ok := <-l.txStatusChan: + if !ok { + return + } + + l.wsconn.safeSend(newSubscriptionResponse(authorExtrinsicUpdatesMethod, l.subID, txStatus.String())) } } }() diff --git a/dot/rpc/subscription/listeners_test.go b/dot/rpc/subscription/listeners_test.go index 80fa402b79..fb1c10788e 100644 --- a/dot/rpc/subscription/listeners_test.go +++ b/dot/rpc/subscription/listeners_test.go @@ -37,6 +37,7 @@ import ( "github.com/ChainSafe/gossamer/lib/grandpa" "github.com/ChainSafe/gossamer/lib/runtime" "github.com/ChainSafe/gossamer/lib/runtime/wasmer" + "github.com/ChainSafe/gossamer/lib/transaction" "github.com/ChainSafe/gossamer/pkg/scale" "github.com/gorilla/websocket" "github.com/stretchr/testify/mock" @@ -194,6 +195,7 @@ func TestExtrinsicSubmitListener_Listen(t *testing.T) { notifyImportedChan := make(chan *types.Block, 100) notifyFinalizedChan := make(chan *types.FinalisationInfo, 100) + txStatusChan := make(chan transaction.Status) BlockAPI := new(mocks.BlockAPI) BlockAPI.On("FreeImportedBlockNotifierChannel", mock.AnythingOfType("chan *types.Block")) @@ -201,9 +203,13 @@ func TestExtrinsicSubmitListener_Listen(t *testing.T) { wsconn.BlockAPI = BlockAPI + TxStateAPI := modules.NewMockTransactionStateAPI() + wsconn.TxStateAPI = TxStateAPI + esl := ExtrinsicSubmitListener{ importedChan: notifyImportedChan, finalisedChan: notifyFinalizedChan, + txStatusChan: txStatusChan, wsconn: wsconn, extrinsic: types.Extrinsic{1, 2, 3}, cancel: make(chan struct{}), diff --git a/dot/rpc/subscription/subscription.go b/dot/rpc/subscription/subscription.go index 870faf5410..37612c6068 100644 --- a/dot/rpc/subscription/subscription.go +++ b/dot/rpc/subscription/subscription.go @@ -6,8 +6,9 @@ import ( "strconv" ) +// RPC methods const ( - authorSubmitAndWatchExtrinsic string = "author_submitAndWatchExtrinsic" //nolint + authorSubmitAndWatchExtrinsic string = "author_submitAndWatchExtrinsic" chainSubscribeNewHeads string = "chain_subscribeNewHeads" chainSubscribeNewHead string = "chain_subscribeNewHead" chainSubscribeFinalizedHeads string = "chain_subscribeFinalizedHeads" diff --git a/dot/rpc/subscription/websocket.go b/dot/rpc/subscription/websocket.go index f407ef195e..7f91d82477 100644 --- a/dot/rpc/subscription/websocket.go +++ b/dot/rpc/subscription/websocket.go @@ -29,6 +29,7 @@ import ( "sync/atomic" "github.com/ChainSafe/gossamer/dot/rpc/modules" + "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/runtime" log "github.com/ChainSafe/log15" @@ -58,8 +59,7 @@ type WSConn struct { CoreAPI modules.CoreAPI TxStateAPI modules.TransactionStateAPI RPCHost string - - HTTP httpclient + HTTP httpclient } // readWebsocketMessage will read and parse the message data to a string->interface{} data @@ -70,7 +70,7 @@ func (c *WSConn) readWebsocketMessage() ([]byte, map[string]interface{}, error) return nil, nil, errCannotReadFromWebsocket } - logger.Trace("websocket received", "message", mbytes) + logger.Trace("websocket received", "message", string(mbytes)) // determine if request is for subscribe method type var msg map[string]interface{} @@ -104,14 +104,14 @@ func (c *WSConn) HandleComm() { logger.Debug("ws method called", "method", method, "params", params) if !strings.Contains(method, "_unsubscribe") && !strings.Contains(method, "_unwatch") { - setup := c.getSetupListener(method) + setupListener := c.getSetupListener(method) - if setup == nil { + if setupListener == nil { c.executeRPCCall(mbytes) continue } - listener, err := setup(reqid, params) //nolint + listener, err := setupListener(reqid, params) //nolint if err != nil { logger.Warn("failed to create listener", "method", method, "error", err) continue @@ -234,7 +234,7 @@ func (c *WSConn) initBlockListener(reqID float64, _ interface{}) (Listener, erro } func (c *WSConn) initBlockFinalizedListener(reqID float64, _ interface{}) (Listener, error) { - bfl := &BlockFinalizedListener{ + blockFinalizedListener := &BlockFinalizedListener{ cancel: make(chan struct{}, 1), done: make(chan struct{}, 1), cancelTimeout: defaultCancelTimeout, @@ -246,19 +246,19 @@ func (c *WSConn) initBlockFinalizedListener(reqID float64, _ interface{}) (Liste return nil, fmt.Errorf("error BlockAPI not set") } - bfl.channel = c.BlockAPI.GetFinalisedNotifierChannel() + blockFinalizedListener.channel = c.BlockAPI.GetFinalisedNotifierChannel() c.mu.Lock() - bfl.subID = atomic.AddUint32(&c.qtyListeners, 1) - c.Subscriptions[bfl.subID] = bfl + blockFinalizedListener.subID = atomic.AddUint32(&c.qtyListeners, 1) + c.Subscriptions[blockFinalizedListener.subID] = blockFinalizedListener c.mu.Unlock() - initRes := NewSubscriptionResponseJSON(bfl.subID, reqID) + initRes := NewSubscriptionResponseJSON(blockFinalizedListener.subID, reqID) c.safeSend(initRes) - return bfl, nil + return blockFinalizedListener, nil } func (c *WSConn) initAllBlocksListerner(reqID float64, _ interface{}) (Listener, error) { @@ -283,42 +283,51 @@ func (c *WSConn) initAllBlocksListerner(reqID float64, _ interface{}) (Listener, func (c *WSConn) initExtrinsicWatch(reqID float64, params interface{}) (Listener, error) { pA := params.([]interface{}) + + if len(pA) != 1 { + return nil, errors.New("expecting only one parameter") + } + + // The passed parameter should be a HEX of a SCALE encoded extrinsic extBytes, err := common.HexToBytes(pA[0].(string)) if err != nil { return nil, err } - // listen for built blocks - esl := NewExtrinsicSubmitListener(c, extBytes) - if c.BlockAPI == nil { return nil, fmt.Errorf("error BlockAPI not set") } - esl.importedChan = c.BlockAPI.GetImportedBlockNotifierChannel() + txStatusChan := c.TxStateAPI.GetStatusNotifierChannel(extBytes) + importedChan := c.BlockAPI.GetImportedBlockNotifierChannel() + finalizedChan := c.BlockAPI.GetFinalisedNotifierChannel() - esl.finalisedChan = c.BlockAPI.GetFinalisedNotifierChannel() + extSubmitListener := NewExtrinsicSubmitListener( + c, + extBytes, + importedChan, + txStatusChan, + finalizedChan, + ) c.mu.Lock() - - esl.subID = atomic.AddUint32(&c.qtyListeners, 1) - c.Subscriptions[esl.subID] = esl - + extSubmitListener.subID = atomic.AddUint32(&c.qtyListeners, 1) + c.Subscriptions[extSubmitListener.subID] = extSubmitListener c.mu.Unlock() err = c.CoreAPI.HandleSubmittedExtrinsic(extBytes) - if err != nil { + if errors.Is(err, runtime.ErrInvalidTransaction) || errors.Is(err, runtime.ErrUnknownTransaction) { + c.safeSend(newSubscriptionResponse(authorExtrinsicUpdatesMethod, extSubmitListener.subID, "invalid")) + return nil, err + } else if err != nil { c.safeSendError(reqID, nil, err.Error()) return nil, err } - c.safeSend(NewSubscriptionResponseJSON(esl.subID, reqID)) - // TODO (ed) since HandleSubmittedExtrinsic has been called we assume the extrinsic is in the tx queue - // should we add a channel to tx queue so we're notified when it's in the queue (#1535) - c.safeSend(newSubscriptionResponse(authorExtrinsicUpdatesMethod, esl.subID, "ready")) + c.safeSend(NewSubscriptionResponseJSON(extSubmitListener.subID, reqID)) // todo (ed) determine which peer extrinsic has been broadcast to, and set status (#1535) - return esl, err + return extSubmitListener, err } func (c *WSConn) initRuntimeVersionListener(reqID float64, _ interface{}) (Listener, error) { diff --git a/dot/rpc/subscription/websocket_test.go b/dot/rpc/subscription/websocket_test.go index cb759b44d6..3bab7d8b43 100644 --- a/dot/rpc/subscription/websocket_test.go +++ b/dot/rpc/subscription/websocket_test.go @@ -7,12 +7,13 @@ import ( "time" "github.com/ChainSafe/gossamer/dot/rpc/modules/mocks" + "github.com/ChainSafe/gossamer/pkg/scale" "github.com/ChainSafe/gossamer/dot/rpc/modules" "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/grandpa" - "github.com/ChainSafe/gossamer/pkg/scale" + "github.com/ChainSafe/gossamer/lib/runtime" "github.com/gorilla/websocket" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -195,27 +196,36 @@ func TestWSConn_HandleComm(t *testing.T) { // test initExtrinsicWatch wsconn.CoreAPI = modules.NewMockCoreAPI() wsconn.BlockAPI = nil - res, err = wsconn.initExtrinsicWatch(0, []interface{}{"NotHex"}) + wsconn.TxStateAPI = modules.NewMockTransactionStateAPI() + listner, err := wsconn.initExtrinsicWatch(0, []interface{}{"NotHex"}) require.EqualError(t, err, "could not byteify non 0x prefixed string") - require.Nil(t, res) + require.Nil(t, listner) - res, err = wsconn.initExtrinsicWatch(0, []interface{}{"0x26aa"}) + listner, err = wsconn.initExtrinsicWatch(0, []interface{}{"0x26aa"}) require.EqualError(t, err, "error BlockAPI not set") - require.Nil(t, res) + require.Nil(t, listner) wsconn.BlockAPI = modules.NewMockBlockAPI() - res, err = wsconn.initExtrinsicWatch(0, []interface{}{"0x26aa"}) + listner, err = wsconn.initExtrinsicWatch(0, []interface{}{"0x26aa"}) require.NoError(t, err) - require.NotNil(t, res) + require.NotNil(t, listner) require.Len(t, wsconn.Subscriptions, 8) _, msg, err = c.ReadMessage() require.NoError(t, err) require.Equal(t, `{"jsonrpc":"2.0","result":8,"id":0}`+"\n", string(msg)) + // test initExtrinsicWatch with invalid transaction + coreAPI := new(mocks.CoreAPI) + coreAPI.On("HandleSubmittedExtrinsic", mock.AnythingOfType("types.Extrinsic")).Return(runtime.ErrInvalidTransaction) + wsconn.CoreAPI = coreAPI + listner, err = wsconn.initExtrinsicWatch(0, []interface{}{"0xa9018400d43593c715fdd31c61141abd04a99fd6822c8558854ccde39a5684e7a56da27d019e91c8d44bf01ffe36d54f9e43dade2b2fc653270a0e002daed1581435c2e1755bc4349f1434876089d99c9dac4d4128e511c2a3e0788a2a74dd686519cb7c83000000000104ab"}) + require.Error(t, err) + require.Nil(t, listner) + _, msg, err = c.ReadMessage() require.NoError(t, err) - require.Equal(t, `{"jsonrpc":"2.0","method":"author_extrinsicUpdate","params":{"result":"ready","subscription":8}}`+"\n", string(msg)) + require.Equal(t, `{"jsonrpc":"2.0","method":"author_extrinsicUpdate","params":{"result":"invalid","subscription":9}}`+"\n", string(msg)) mockedJust := grandpa.Justification{ Round: 1, @@ -228,6 +238,7 @@ func TestWSConn_HandleComm(t *testing.T) { mockedJustBytes, err := scale.Marshal(mockedJust) require.NoError(t, err) + wsconn.CoreAPI = modules.NewMockCoreAPI() BlockAPI := new(mocks.BlockAPI) fCh := make(chan *types.FinalisationInfo, 5) @@ -243,7 +254,7 @@ func TestWSConn_HandleComm(t *testing.T) { _, msg, err = c.ReadMessage() require.NoError(t, err) - require.Equal(t, `{"jsonrpc":"2.0","result":9,"id":0}`+"\n", string(msg)) + require.Equal(t, `{"jsonrpc":"2.0","result":10,"id":0}`+"\n", string(msg)) listener.Listen() header := &types.Header{ @@ -256,7 +267,7 @@ func TestWSConn_HandleComm(t *testing.T) { time.Sleep(time.Second * 2) - expected := `{"jsonrpc":"2.0","method":"grandpa_justifications","params":{"result":"%s","subscription":9}}` + "\n" + expected := `{"jsonrpc":"2.0","method":"grandpa_justifications","params":{"result":"%s","subscription":10}}` + "\n" expected = fmt.Sprintf(expected, common.BytesToHex(mockedJustBytes)) _, msg, err = c.ReadMessage() require.NoError(t, err) diff --git a/dot/rpc/websocket_test.go b/dot/rpc/websocket_test.go index 8d378552b0..baac17e17a 100644 --- a/dot/rpc/websocket_test.go +++ b/dot/rpc/websocket_test.go @@ -54,18 +54,22 @@ func TestHTTPServer_ServeHTTP(t *testing.T) { sysAPI := system.NewService(si, nil) bAPI := modules.NewMockBlockAPI() sAPI := modules.NewMockStorageAPI() + + TxStateAPI := modules.NewMockTransactionStateAPI() + cfg := &HTTPServerConfig{ - Modules: []string{"system", "chain"}, - RPCExternal: false, - RPCPort: 8545, - WSPort: 8546, - WS: true, - WSExternal: false, - RPCAPI: NewService(), - CoreAPI: coreAPI, - SystemAPI: sysAPI, - BlockAPI: bAPI, - StorageAPI: sAPI, + Modules: []string{"system", "chain"}, + RPCExternal: false, + RPCPort: 8545, + WSPort: 8546, + WS: true, + WSExternal: false, + RPCAPI: NewService(), + CoreAPI: coreAPI, + SystemAPI: sysAPI, + BlockAPI: bAPI, + StorageAPI: sAPI, + TransactionQueueAPI: TxStateAPI, } s := NewHTTPServer(cfg) diff --git a/dot/state/transaction.go b/dot/state/transaction.go index ff9fb5d075..73e0d6e0c0 100644 --- a/dot/state/transaction.go +++ b/dot/state/transaction.go @@ -1,6 +1,8 @@ package state import ( + "sync" + "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/transaction" @@ -10,18 +12,25 @@ import ( type TransactionState struct { queue *transaction.PriorityQueue pool *transaction.Pool + + // notifierChannels are used to notify transaction status. It maps a channel to + // hex string of the extrinsic it is supposed to notify about. + notifierChannels map[chan transaction.Status]string + notifierLock sync.RWMutex } // NewTransactionState returns a new TransactionState func NewTransactionState() *TransactionState { return &TransactionState{ - queue: transaction.NewPriorityQueue(), - pool: transaction.NewPool(), + queue: transaction.NewPriorityQueue(), + pool: transaction.NewPool(), + notifierChannels: make(map[chan transaction.Status]string), } } // Push pushes a transaction to the queue, ordered by priority func (s *TransactionState) Push(vt *transaction.ValidTransaction) (common.Hash, error) { + s.notifyStatus(vt.Extrinsic, transaction.Ready) return s.queue.Push(vt) } @@ -58,5 +67,50 @@ func (s *TransactionState) RemoveExtrinsicFromPool(ext types.Extrinsic) { // AddToPool adds a transaction to the pool func (s *TransactionState) AddToPool(vt *transaction.ValidTransaction) common.Hash { + s.notifyStatus(vt.Extrinsic, transaction.Future) return s.pool.Insert(vt) } + +// GetStatusNotifierChannel creates and returns a status notifier channel. +func (s *TransactionState) GetStatusNotifierChannel(ext types.Extrinsic) chan transaction.Status { + s.notifierLock.Lock() + defer s.notifierLock.Unlock() + + ch := make(chan transaction.Status, DEFAULT_BUFFER_SIZE) + s.notifierChannels[ch] = ext.String() + return ch +} + +// FreeStatusNotifierChannel deletes given status notifier channel from our map. +func (s *TransactionState) FreeStatusNotifierChannel(ch chan transaction.Status) { + s.notifierLock.Lock() + defer s.notifierLock.Unlock() + + delete(s.notifierChannels, ch) +} + +func (s *TransactionState) notifyStatus(ext types.Extrinsic, status transaction.Status) { + s.notifierLock.Lock() + defer s.notifierLock.Unlock() + + if len(s.notifierChannels) == 0 { + return + } + + var wg sync.WaitGroup + for ch, extrinsicStrWithCh := range s.notifierChannels { + if extrinsicStrWithCh != ext.String() { + continue + } + wg.Add(1) + go func(ch chan transaction.Status) { + defer wg.Done() + + select { + case ch <- status: + default: + } + }(ch) + } + wg.Wait() +} diff --git a/dot/state/transaction_test.go b/dot/state/transaction_test.go index 299e380fcf..d60276ff34 100644 --- a/dot/state/transaction_test.go +++ b/dot/state/transaction_test.go @@ -1,9 +1,12 @@ package state import ( + "math/rand" "sort" "testing" + "time" + "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/transaction" @@ -59,3 +62,54 @@ func TestTransactionState_Pending(t *testing.T) { head := ts.Peek() require.Nil(t, head) } + +func TestTransactionState_NotifierChannels(t *testing.T) { + ts := NewTransactionState() + + ext := types.Extrinsic{} + notifierChannel := ts.GetStatusNotifierChannel(ext) + defer ts.FreeStatusNotifierChannel(notifierChannel) + + // number of "future" status updates + var futureCount int + // number of "ready" status updates + var readyCount int + + rand.Seed(time.Now().UnixNano()) + + // In practice, one won't see ready and future in this order. This is merely + // meant to check that notifier channels work as expected. + expectedFutureCount := rand.Intn(10) + 10 + expectedReadyCount := rand.Intn(5) + 5 + + dummyTransactions := make([]*transaction.ValidTransaction, expectedFutureCount) + + for i := 0; i < expectedFutureCount; i++ { + dummyTransactions[i] = &transaction.ValidTransaction{ + Extrinsic: ext, + Validity: transaction.NewValidity(0, [][]byte{{}}, [][]byte{{}}, 0, false), + } + + ts.AddToPool(dummyTransactions[i]) + } + + for i := 0; i < expectedReadyCount; i++ { + ts.Push(dummyTransactions[i]) + } + + // it takes time for the status updates to happen + time.Sleep(1 * time.Second) + close(notifierChannel) + + for status := range notifierChannel { + if status.String() == transaction.Future.String() { + futureCount++ + } + if status.String() == transaction.Ready.String() { + readyCount++ + } + } + + require.Equal(t, expectedFutureCount, futureCount) + require.Equal(t, expectedReadyCount, readyCount) +} diff --git a/lib/transaction/types.go b/lib/transaction/types.go index 6d538ab37f..9e5b4cefee 100644 --- a/lib/transaction/types.go +++ b/lib/transaction/types.go @@ -53,3 +53,91 @@ func NewValidTransaction(extrinsic types.Extrinsic, validity *Validity) *ValidTr Validity: validity, } } + +// // StatusNotification represents information about a transaction status update. +// type StatusNotification struct { +// Ext types.Extrinsic +// Status string +// PeersBroadcastedTo []string +// BlockHash *common.Hash +// } + +//nolint +// Status represents possible transaction statuses. +// +// The status events can be grouped based on their kinds as: +// 1. Entering/Moving within the pool: +// - `Future` +// - `Ready` +// 2. Inside `Ready` queue: +// - `Broadcast` +// 3. Leaving the pool: +// - `InBlock` +// - `Invalid` +// - `Usurped` +// - `Dropped` +// 4. Re-entering the pool: +// - `Retracted` +// 5. Block finalized: +// - `Finalized` +// - `FinalityTimeout` +type Status int64 + +const ( + // Future status occurs when transaction is part of the future queue. + Future Status = iota + // Ready status occurs when transaction is part of the ready queue. + Ready + // Broadcast status occurs when transaction has been broadcast to the given peers. + Broadcast + // InBlock status occurs when transaction has been included in block with given + // hash. + InBlock + // Retracted status occurs when the block this transaction was included in has + // been retracted. + Retracted + // FinalityTimeout status occurs when the maximum number of finality watchers + // has been reached, + // old watchers are being removed. + FinalityTimeout + //nolint + // Finalized status occurs when transaction has been finalized by a finality-gadget, + // e.g GRANDPA + Finalized + // Usurped status occurs when transaction has been replaced in the pool, by another + // transaction that provides the same tags. (e.g. same (sender, nonce)). + Usurped + // Dropped status occurs when transaction has been dropped from the pool because + // of the limit. + Dropped + // Invalid status occurs when transaction is no longer valid in the current state. + Invalid +) + +// String returns string representation of current status. +func (s Status) String() string { + switch s { + case Future: + return "future" + case Ready: + return "ready" + case Broadcast: + return "broadcast" + case InBlock: + return "inBlock" + case Retracted: + return "retracted" + case FinalityTimeout: + return "finalityTimeout" + //nolint + case Finalized: + return "finalized" + case Usurped: + return "usurped" + case Dropped: + return "dropped" + case Invalid: + return "invalid" + } + return "unknown" +} diff --git a/tests/stress/stress_test.go b/tests/stress/stress_test.go index 7cf34ed850..a23aab699f 100644 --- a/tests/stress/stress_test.go +++ b/tests/stress/stress_test.go @@ -33,6 +33,7 @@ import ( gsrpc "github.com/centrifuge/go-substrate-rpc-client/v3" "github.com/centrifuge/go-substrate-rpc-client/v3/signature" "github.com/centrifuge/go-substrate-rpc-client/v3/types" + "github.com/gorilla/websocket" "github.com/stretchr/testify/require" ) @@ -470,3 +471,91 @@ func TestSync_SubmitExtrinsic(t *testing.T) { hashes, err := compareBlocksByNumberWithRetry(t, nodes, extInBlock.String()) require.NoError(t, err, hashes) } + +func Test_SubmitAndWatchExtrinsic(t *testing.T) { + t.Log("starting gossamer...") + + // index of node to submit tx to + idx := 0 // TODO: randomise this + + // start block producing node first + node, err := utils.RunGossamer(t, 0, utils.TestDir(t, utils.KeyList[0]), utils.GenesisDev, utils.ConfigNoGrandpa, true, true) + require.NoError(t, err) + nodes := []*utils.Node{node} + + defer func() { + t.Log("going to tear down gossamer...") + errList := utils.StopNodes(t, nodes) + require.Len(t, errList, 0) + }() + + // send tx to non-authority node + api, err := gsrpc.NewSubstrateAPI(fmt.Sprintf("ws://localhost:%s", nodes[idx].WSPort)) + require.NoError(t, err) + + meta, err := api.RPC.State.GetMetadataLatest() + require.NoError(t, err) + + c, err := types.NewCall(meta, "System.remark", []byte{0xab}) + require.NoError(t, err) + + // Create the extrinsic + ext := types.NewExtrinsic(c) + + genesisHash, err := api.RPC.Chain.GetBlockHash(0) + require.NoError(t, err) + + rv, err := api.RPC.State.GetRuntimeVersionLatest() + require.NoError(t, err) + + key, err := types.CreateStorageKey(meta, "System", "Account", signature.TestKeyringPairAlice.PublicKey, nil) + require.NoError(t, err) + + var accInfo types.AccountInfo + ok, err := api.RPC.State.GetStorageLatest(key, &accInfo) + require.NoError(t, err) + require.True(t, ok) + + o := types.SignatureOptions{ + BlockHash: genesisHash, + Era: types.ExtrinsicEra{IsImmortalEra: true}, + GenesisHash: genesisHash, + Nonce: types.NewUCompactFromUInt(uint64(accInfo.Nonce)), + SpecVersion: rv.SpecVersion, + Tip: types.NewUCompactFromUInt(0), + TransactionVersion: rv.TransactionVersion, + } + + // Sign the transaction using Alice's default account + err = ext.Sign(signature.TestKeyringPairAlice, o) + require.NoError(t, err) + + extEnc, err := types.EncodeToHexString(ext) + require.NoError(t, err) + + conn, _, err := websocket.DefaultDialer.Dial("ws://localhost:8546", nil) + if err != nil { + fmt.Println(err) + } + + message := []byte(`{"id":1, "jsonrpc":"2.0", "method": "author_submitAndWatchExtrinsic", "params":["` + extEnc + `"]}`) + + err = conn.WriteMessage(websocket.TextMessage, message) + require.NoError(t, err) + + var result []byte + _, result, err = conn.ReadMessage() + require.NoError(t, err) + require.Equal(t, "{\"jsonrpc\":\"2.0\",\"result\":1,\"id\":1}\n", string(result)) + + conn.SetReadDeadline(time.Now().Add(10 * time.Second)) + + _, result, err = conn.ReadMessage() + require.NoError(t, err) + require.Equal(t, "{\"jsonrpc\":\"2.0\",\"method\":\"author_extrinsicUpdate\",\"params\":{\"result\":\"ready\",\"subscription\":1}}\n", string(result)) + + _, result, err = conn.ReadMessage() + require.NoError(t, err) + require.Contains(t, string(result), "{\"jsonrpc\":\"2.0\",\"method\":\"author_extrinsicUpdate\",\"params\":{\"result\":{\"inBlock\":\"") + +}