Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dot/core): Batch process transaction message. #1780

Merged
merged 16 commits into from
Sep 17, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 35 additions & 7 deletions dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,16 @@ type (

// NotificationsMessageHandler is called when a (non-handshake) message is received over a notifications stream.
NotificationsMessageHandler = func(peer peer.ID, msg NotificationsMessage) (propagate bool, err error)

// NotificationsMessageBatchHandler is called when a (non-handshake) message is received over a notifications stream in batch processing mode.
NotificationsMessageBatchHandler = func(peer peer.ID, msg NotificationsMessage) (batchMsgs []*batchMessage, err error)
)

type batchMessage struct {
msg NotificationsMessage
peer peer.ID
}

type handshakeReader struct {
hs Handshake
err error
Expand Down Expand Up @@ -119,7 +127,7 @@ func createDecoder(info *notificationsProtocol, handshakeDecoder HandshakeDecode
}
}

func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, messageHandler NotificationsMessageHandler) messageHandler {
func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, messageHandler NotificationsMessageHandler, batchHandler NotificationsMessageBatchHandler) messageHandler {
return func(stream libp2pnetwork.Stream, m Message) error {
if m == nil || info == nil || info.handshakeValidator == nil || messageHandler == nil {
return nil
Expand Down Expand Up @@ -188,18 +196,38 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol,
"peer", stream.Conn().RemotePeer(),
)

propagate, err := messageHandler(peer, msg)
if err != nil {
return err
var (
propagate bool
err error
msgs []*batchMessage
)
if batchHandler != nil {
msgs, err = batchHandler(peer, msg)
if err != nil {
return err
}

propagate = len(msgs) > 0
} else {
propagate, err = messageHandler(peer, msg)
if err != nil {
return err
}
msgs = append(msgs, &batchMessage{
msg: msg,
peer: peer,
})
}

if !propagate || s.noGossip {
return nil
}

seen := s.gossip.hasSeen(msg)
if !seen {
s.broadcastExcluding(info, peer, msg)
for _, data := range msgs {
seen := s.gossip.hasSeen(data.msg)
if !seen {
s.broadcastExcluding(info, data.peer, data.msg)
}
}

return nil
Expand Down
107 changes: 105 additions & 2 deletions dot/network/notifications_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/ChainSafe/gossamer/lib/utils"
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -141,7 +142,7 @@ func TestCreateNotificationsMessageHandler_BlockAnnounce(t *testing.T) {
inboundHandshakeData: new(sync.Map),
outboundHandshakeData: new(sync.Map),
}
handler := s.createNotificationsMessageHandler(info, s.handleBlockAnnounceMessage)
handler := s.createNotificationsMessageHandler(info, s.handleBlockAnnounceMessage, nil)

// set handshake data to received
info.inboundHandshakeData.Store(testPeerID, handshakeData{
Expand Down Expand Up @@ -174,7 +175,7 @@ func TestCreateNotificationsMessageHandler_BlockAnnounceHandshake(t *testing.T)
inboundHandshakeData: new(sync.Map),
outboundHandshakeData: new(sync.Map),
}
handler := s.createNotificationsMessageHandler(info, s.handleBlockAnnounceMessage)
handler := s.createNotificationsMessageHandler(info, s.handleBlockAnnounceMessage, nil)

configB := &Config{
BasePath: utils.NewTestBasePath(t, "nodeB"),
Expand Down Expand Up @@ -313,3 +314,105 @@ func Test_HandshakeTimeout(t *testing.T) {
require.Len(t, connAToB, 1)
require.Len(t, connAToB[0].GetStreams(), 0)
}

func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) {
basePath := utils.NewTestBasePath(t, "nodeA")
mockhandler := &MockTransactionHandler{}
mockhandler.On("HandleTransactionMessage", mock.AnythingOfType("*network.TransactionMessage")).Return(true, nil)
mockhandler.On("TransactionsCount").Return(0)
config := &Config{
BasePath: basePath,
Port: 7001,
NoBootstrap: true,
NoMDNS: true,
TransactionHandler: mockhandler,
}

s := createTestService(t, config)
s.batchSize = 5

configB := &Config{
BasePath: utils.NewTestBasePath(t, "nodeB"),
Port: 7002,
NoBootstrap: true,
NoMDNS: true,
}

b := createTestService(t, configB)

txnBatch := make(chan *batchMessage, s.batchSize)
txnBatchHandler := s.createBatchMessageHandler(txnBatch)

// don't set handshake data ie. this stream has just been opened
testPeerID := b.host.id()

// connect nodes
addrInfoB := b.host.addrInfo()
err := s.host.connect(addrInfoB)
if failedToDial(err) {
time.Sleep(TestBackoffTimeout)
err = s.host.connect(addrInfoB)
}
require.NoError(t, err)

stream, err := s.host.h.NewStream(s.ctx, b.host.id(), s.host.protocolID+transactionsID)
require.NoError(t, err)
require.Len(t, txnBatch, 0)

// create info and handler
info := &notificationsProtocol{
protocolID: s.host.protocolID + transactionsID,
getHandshake: s.getTransactionHandshake,
handshakeValidator: validateTransactionHandshake,
inboundHandshakeData: new(sync.Map),
outboundHandshakeData: new(sync.Map),
}
handler := s.createNotificationsMessageHandler(info, s.handleTransactionMessage, txnBatchHandler)

// set handshake data to received
info.inboundHandshakeData.Store(testPeerID, handshakeData{
received: true,
validated: true,
})
msg := &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 1)

msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 2)

msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 3)

msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 4)

msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 0)

msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 1)
}
11 changes: 10 additions & 1 deletion dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ type Service struct {
// telemetry
telemetryInterval time.Duration
closeCh chan interface{}

batchSize int
}

// NewService creates a new network service from the configuration and message channels
Expand Down Expand Up @@ -171,6 +173,7 @@ func NewService(cfg *Config) (*Service, error) {
closeCh: make(chan interface{}),
bufPool: bufPool,
streamManager: newStreamManager(ctx),
batchSize: 100,
}

network.syncQueue = newSyncQueue(network)
Expand Down Expand Up @@ -218,12 +221,16 @@ func (s *Service) Start() error {
s.validateBlockAnnounceHandshake,
decodeBlockAnnounceMessage,
s.handleBlockAnnounceMessage,
nil,
false,
)
if err != nil {
logger.Warn("failed to register notifications protocol", "sub-protocol", blockAnnounceID, "error", err)
}

txnBatch := make(chan *batchMessage, s.batchSize)
txnBatchHandler := s.createBatchMessageHandler(txnBatch)

// register transactions protocol
err = s.RegisterNotificationsProtocol(
transactionsID,
Expand All @@ -233,6 +240,7 @@ func (s *Service) Start() error {
validateTransactionHandshake,
decodeTransactionMessage,
s.handleTransactionMessage,
txnBatchHandler,
false,
)
if err != nil {
Expand Down Expand Up @@ -421,6 +429,7 @@ func (s *Service) RegisterNotificationsProtocol(sub protocol.ID,
handshakeValidator HandshakeValidator,
messageDecoder MessageDecoder,
messageHandler NotificationsMessageHandler,
batchHandler NotificationsMessageBatchHandler,
overwriteProtocol bool,
) error {
s.notificationsMu.Lock()
Expand Down Expand Up @@ -471,7 +480,7 @@ func (s *Service) RegisterNotificationsProtocol(sub protocol.ID,
info := s.notificationsProtocols[messageID]

decoder := createDecoder(info, handshakeDecoder, messageDecoder)
handlerWithValidate := s.createNotificationsMessageHandler(info, messageHandler)
handlerWithValidate := s.createNotificationsMessageHandler(info, messageHandler, batchHandler)

s.host.registerStreamHandlerWithOverwrite(sub, overwriteProtocol, func(stream libp2pnetwork.Stream) {
logger.Trace("received stream", "sub-protocol", sub)
Expand Down
33 changes: 33 additions & 0 deletions dot/network/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,39 @@ func decodeTransactionHandshake(_ []byte) (Handshake, error) {
return &transactionHandshake{}, nil
}

func (s *Service) createBatchMessageHandler(txnBatch chan *batchMessage) NotificationsMessageBatchHandler {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does txnBatch channel need to be passed in here? seems it's only used within this function, so can it be declared in this function>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, It can be but we need to provide the channel from outside for the test cases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we could receive a txnBatch chan<- *batchMessage

return func(peer peer.ID, msg NotificationsMessage) (msgs []*batchMessage, err error) {
data := &batchMessage{
msg: msg,
peer: peer,
}
txnBatch <- data
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we check if the channel is closed before send data?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This channel will never be closed.


if len(txnBatch) < s.batchSize {
return nil, nil
}

var propagateMsgs []*batchMessage
for txnData := range txnBatch {
propagate, err := s.handleTransactionMessage(txnData.peer, txnData.msg)
if err != nil {
continue
}
if propagate {
propagateMsgs = append(propagateMsgs, &batchMessage{
msg: txnData.msg,
peer: txnData.peer,
})
}
if len(txnBatch) == 0 {
break
}
}
// May be use error to compute peer score.
return propagateMsgs, nil
}
}

func validateTransactionHandshake(_ peer.ID, _ Handshake) error {
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions dot/sync/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package sync

import (
"math/big"
"sync"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
Expand Down Expand Up @@ -56,8 +57,7 @@ type StorageState interface {
TrieState(root *common.Hash) (*rtstorage.TrieState, error)
LoadCodeHash(*common.Hash) (common.Hash, error)
SetSyncing(bool)
Lock()
Unlock()
sync.Locker
}

// CodeSubstitutedState interface to handle storage of code substitute state
Expand Down
4 changes: 2 additions & 2 deletions lib/babe/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package babe

import (
"math/big"
"sync"
"time"

"github.com/ChainSafe/gossamer/dot/types"
Expand Down Expand Up @@ -52,8 +53,7 @@ type BlockState interface {
// StorageState interface for storage state methods
type StorageState interface {
TrieState(hash *common.Hash) (*rtstorage.TrieState, error)
Lock()
Unlock()
sync.Locker
}

// TransactionState is the interface for transaction queue methods
Expand Down
15 changes: 8 additions & 7 deletions lib/grandpa/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type GrandpaHandshake struct { //nolint
}

// SubProtocol returns the grandpa sub-protocol
func (hs *GrandpaHandshake) SubProtocol() string {
func (*GrandpaHandshake) SubProtocol() string {
return string(grandpaID)
}

Expand All @@ -79,17 +79,17 @@ func (hs *GrandpaHandshake) Decode(in []byte) error {
}

// Type ...
func (hs *GrandpaHandshake) Type() byte {
func (*GrandpaHandshake) Type() byte {
return 0
}

// Hash ...
func (hs *GrandpaHandshake) Hash() common.Hash {
func (*GrandpaHandshake) Hash() common.Hash {
return common.Hash{}
}

// IsHandshake returns true
func (hs *GrandpaHandshake) IsHandshake() bool {
func (*GrandpaHandshake) IsHandshake() bool {
return true
}

Expand All @@ -101,6 +101,7 @@ func (s *Service) registerProtocol() error {
s.validateHandshake,
s.decodeMessage,
s.handleNetworkMessage,
nil,
true,
)
}
Expand All @@ -119,17 +120,17 @@ func (s *Service) getHandshake() (Handshake, error) {
}, nil
}

func (s *Service) decodeHandshake(in []byte) (Handshake, error) {
func (*Service) decodeHandshake(in []byte) (Handshake, error) {
hs := new(GrandpaHandshake)
err := hs.Decode(in)
return hs, err
}

func (s *Service) validateHandshake(_ peer.ID, _ Handshake) error {
func (*Service) validateHandshake(_ peer.ID, _ Handshake) error {
return nil
}

func (s *Service) decodeMessage(in []byte) (NotificationsMessage, error) {
func (*Service) decodeMessage(in []byte) (NotificationsMessage, error) {
msg := new(network.ConsensusMessage)
err := msg.Decode(in)
return msg, err
Expand Down
Loading