From 6e5e885c02ac84edd9dae63393f84ab1e52453e1 Mon Sep 17 00:00:00 2001 From: Arijit Das Date: Tue, 2 Nov 2021 14:50:53 +0530 Subject: [PATCH] Implement time based batch processing of transcation message. --- dot/network/config.go | 5 +++ dot/network/connmgr_test.go | 3 +- dot/network/host.go | 14 ++++++ dot/network/notifications.go | 36 +++++----------- dot/network/notifications_test.go | 66 +++++++++++++++------------- dot/network/service.go | 17 ++++---- dot/network/service_test.go | 7 ++- dot/network/transaction.go | 71 ++++++++++++++++++++----------- 8 files changed, 126 insertions(+), 93 deletions(-) diff --git a/dot/network/config.go b/dot/network/config.go index 8946a740d70..52186671db9 100644 --- a/dot/network/config.go +++ b/dot/network/config.go @@ -52,6 +52,9 @@ const ( // DefaultDiscoveryInterval is the default interval for searching for DHT peers DefaultDiscoveryInterval = time.Minute * 5 + + // defaultTxnBatchSize is the default size for the transaction batch + defaultTxnBatchSize = 100 ) // DefaultBootnodes the default value for Config.Bootnodes @@ -104,6 +107,8 @@ type Config struct { telemetryInterval time.Duration noPreAllocate bool // internal option + + batchSize int // internal option } // build checks the configuration, sets up the private key for the network service, diff --git a/dot/network/connmgr_test.go b/dot/network/connmgr_test.go index 50e0661b80e..7af3e1d153a 100644 --- a/dot/network/connmgr_test.go +++ b/dot/network/connmgr_test.go @@ -21,9 +21,10 @@ import ( "testing" "time" - "github.com/ChainSafe/gossamer/lib/utils" "github.com/libp2p/go-libp2p-core/peer" "github.com/stretchr/testify/require" + + "github.com/ChainSafe/gossamer/lib/utils" ) func TestMaxPeers(t *testing.T) { diff --git a/dot/network/host.go b/dot/network/host.go index dcca60e3e2f..6258e247eb9 100644 --- a/dot/network/host.go +++ b/dot/network/host.go @@ -396,3 +396,17 @@ func (h *host) multiaddrs() (multiaddrs []ma.Multiaddr) { func (h *host) protocols() []string { return h.h.Mux().Protocols() } + +func (h *host) closeProtocolStream(pID protocol.ID, p peer.ID) { + connToPeer := h.h.Network().ConnsToPeer(p) + for _, c := range connToPeer { + for _, st := range c.GetStreams() { + if st.Protocol() == pID { + err := st.Close() + if err != nil { + logger.Trace("Failed to close stream", "protocol", pID, "error", err) + } + } + } + } +} diff --git a/dot/network/notifications.go b/dot/network/notifications.go index 7ba62745e7a..b6001ef0932 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -57,7 +57,7 @@ type ( NotificationsMessageHandler = func(peer peer.ID, msg NotificationsMessage) (propagate bool, err error) // NotificationsMessageBatchHandler is called when a (non-handshake) message is received over a notifications stream in batch processing mode. - NotificationsMessageBatchHandler = func(peer peer.ID, msg NotificationsMessage) (batchMsgs []*BatchMessage, err error) + NotificationsMessageBatchHandler = func(peer peer.ID, msg NotificationsMessage) ) // BatchMessage is exported for the mocks of lib/grandpa/mocks/network.go @@ -221,40 +221,24 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, "peer", stream.Conn().RemotePeer(), ) - var ( - propagate bool - err error - msgs []*BatchMessage - ) if batchHandler != nil { - msgs, err = batchHandler(peer, msg) - if err != nil { - return err - } + batchHandler(peer, msg) + return nil + } - propagate = len(msgs) > 0 - } else { - propagate, err = messageHandler(peer, msg) - if err != nil { - return err - } - msgs = append(msgs, &BatchMessage{ - msg: msg, - peer: peer, - }) + propagate, err := messageHandler(peer, msg) + if err != nil { + return err } if !propagate || s.noGossip { return nil } - for _, data := range msgs { - seen := s.gossip.hasSeen(data.msg) - if !seen { - s.broadcastExcluding(info, data.peer, data.msg) - } + seen := s.gossip.hasSeen(msg) + if !seen { + s.broadcastExcluding(info, peer, msg) } - return nil } } diff --git a/dot/network/notifications_test.go b/dot/network/notifications_test.go index cfb8803bb43..f66804eeb3b 100644 --- a/dot/network/notifications_test.go +++ b/dot/network/notifications_test.go @@ -25,13 +25,13 @@ import ( "time" "unsafe" - "github.com/ChainSafe/gossamer/dot/types" - "github.com/ChainSafe/gossamer/lib/common" - "github.com/ChainSafe/gossamer/lib/utils" libp2pnetwork "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + + "github.com/ChainSafe/gossamer/dot/types" + "github.com/ChainSafe/gossamer/lib/common" + "github.com/ChainSafe/gossamer/lib/utils" ) func TestHandshake_SizeOf(t *testing.T) { @@ -320,20 +320,17 @@ func Test_HandshakeTimeout(t *testing.T) { } func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) { + const batchSize = 5 basePath := utils.NewTestBasePath(t, "nodeA") - mockhandler := &MockTransactionHandler{} - mockhandler.On("HandleTransactionMessage", mock.AnythingOfType("*network.TransactionMessage")).Return(true, nil) - mockhandler.On("TransactionsCount").Return(0) config := &Config{ - BasePath: basePath, - Port: 7001, - NoBootstrap: true, - NoMDNS: true, - TransactionHandler: mockhandler, + BasePath: basePath, + Port: 7001, + NoBootstrap: true, + NoMDNS: true, + batchSize: batchSize, } - s := createTestService(t, config) - s.batchSize = 5 + srvc1 := createTestService(t, config) configB := &Config{ BasePath: utils.NewTestBasePath(t, "nodeB"), @@ -342,42 +339,41 @@ func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) { NoMDNS: true, } - b := createTestService(t, configB) + srvc2 := createTestService(t, configB) - txnBatch := make(chan *BatchMessage, s.batchSize) - txnBatchHandler := s.createBatchMessageHandler(txnBatch) - - // don't set handshake data ie. this stream has just been opened - testPeerID := b.host.id() + txnBatch := make(chan *BatchMessage, batchSize) + txnBatchHandler := srvc1.createBatchMessageHandler(txnBatch) // connect nodes - addrInfoB := b.host.addrInfo() - err := s.host.connect(addrInfoB) + addrInfoB := srvc2.host.addrInfo() + err := srvc1.host.connect(addrInfoB) if failedToDial(err) { time.Sleep(TestBackoffTimeout) - err = s.host.connect(addrInfoB) + err = srvc1.host.connect(addrInfoB) + require.NoError(t, err) } require.NoError(t, err) - stream, err := s.host.h.NewStream(s.ctx, b.host.id(), s.host.protocolID+transactionsID) + txnProtocolID := srvc1.host.protocolID + transactionsID + stream, err := srvc1.host.h.NewStream(srvc1.ctx, srvc2.host.id(), txnProtocolID) require.NoError(t, err) - require.Len(t, txnBatch, 0) // create info and handler info := ¬ificationsProtocol{ - protocolID: s.host.protocolID + transactionsID, - getHandshake: s.getTransactionHandshake, + protocolID: txnProtocolID, + getHandshake: srvc1.getTransactionHandshake, handshakeValidator: validateTransactionHandshake, inboundHandshakeData: new(sync.Map), outboundHandshakeData: new(sync.Map), } - handler := s.createNotificationsMessageHandler(info, s.handleTransactionMessage, txnBatchHandler) + handler := srvc1.createNotificationsMessageHandler(info, srvc1.handleTransactionMessage, txnBatchHandler) // set handshake data to received - info.inboundHandshakeData.Store(testPeerID, handshakeData{ + info.inboundHandshakeData.Store(srvc2.host.id(), handshakeData{ received: true, validated: true, }) + msg := &TransactionMessage{ Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}}, } @@ -411,11 +407,21 @@ func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) { } err = handler(stream, msg) require.NoError(t, err) - require.Len(t, txnBatch, 0) + require.Len(t, txnBatch, 5) + + // reached batch size limit, below transaction will not be included in batch. + msg = &TransactionMessage{ + Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}}, + } + err = handler(stream, msg) + require.NoError(t, err) + require.Len(t, txnBatch, 5) msg = &TransactionMessage{ Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}}, } + // wait for transaction batch channel to process. + time.Sleep(1300 * time.Millisecond) err = handler(stream, msg) require.NoError(t, err) require.Len(t, txnBatch, 1) diff --git a/dot/network/service.go b/dot/network/service.go index c1924868596..2cc55d204a8 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -25,15 +25,16 @@ import ( "sync" "time" - gssmrmetrics "github.com/ChainSafe/gossamer/dot/metrics" - "github.com/ChainSafe/gossamer/dot/telemetry" - "github.com/ChainSafe/gossamer/lib/common" - "github.com/ChainSafe/gossamer/lib/services" log "github.com/ChainSafe/log15" "github.com/ethereum/go-ethereum/metrics" libp2pnetwork "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" + + gssmrmetrics "github.com/ChainSafe/gossamer/dot/metrics" + "github.com/ChainSafe/gossamer/dot/telemetry" + "github.com/ChainSafe/gossamer/lib/common" + "github.com/ChainSafe/gossamer/lib/services" ) const ( @@ -100,8 +101,6 @@ type Service struct { blockResponseBuf []byte blockResponseBufMu sync.Mutex - - batchSize int } // NewService creates a new network service from the configuration and message channels @@ -138,6 +137,9 @@ func NewService(cfg *Config) (*Service, error) { connectToPeersTimeout = cfg.DiscoveryInterval } + if cfg.batchSize == 0 { + cfg.batchSize = defaultTxnBatchSize + } // create a new host instance host, err := newHost(ctx, cfg) if err != nil { @@ -176,7 +178,6 @@ func NewService(cfg *Config) (*Service, error) { bufPool: bufPool, streamManager: newStreamManager(ctx), blockResponseBuf: make([]byte, maxBlockResponseSize), - batchSize: 100, } return network, err @@ -224,7 +225,7 @@ func (s *Service) Start() error { logger.Warn("failed to register notifications protocol", "sub-protocol", blockAnnounceID, "error", err) } - txnBatch := make(chan *BatchMessage, s.batchSize) + txnBatch := make(chan *BatchMessage, s.cfg.batchSize) txnBatchHandler := s.createBatchMessageHandler(txnBatch) // register transactions protocol diff --git a/dot/network/service_test.go b/dot/network/service_test.go index 60746050099..ae3dbddba11 100644 --- a/dot/network/service_test.go +++ b/dot/network/service_test.go @@ -24,9 +24,10 @@ import ( "testing" "time" - "github.com/ChainSafe/gossamer/lib/utils" mock "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + + "github.com/ChainSafe/gossamer/lib/utils" ) var TestProtocolID = "/gossamer/test/0" @@ -65,6 +66,8 @@ func createServiceHelper(t *testing.T, num int) []*Service { // helper method to create and start a new network service func createTestService(t *testing.T, cfg *Config) (srvc *Service) { + t.Helper() + if cfg == nil { basePath := utils.NewTestBasePath(t, "node") @@ -83,7 +86,7 @@ func createTestService(t *testing.T, cfg *Config) (srvc *Service) { if cfg.TransactionHandler == nil { mocktxhandler := &MockTransactionHandler{} - mocktxhandler.On("HandleTransactionMessage", mock.AnythingOfType("*TransactionMessage")).Return(nil) + mocktxhandler.On("HandleTransactionMessage", mock.AnythingOfType("*network.TransactionMessage")).Return(true, nil) mocktxhandler.On("TransactionsCount").Return(0) cfg.TransactionHandler = mocktxhandler } diff --git a/dot/network/transaction.go b/dot/network/transaction.go index 0e4cead9fc9..1d67c0abd28 100644 --- a/dot/network/transaction.go +++ b/dot/network/transaction.go @@ -19,12 +19,13 @@ package network import ( "errors" "fmt" + "time" + + "github.com/libp2p/go-libp2p-core/peer" "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/pkg/scale" - - "github.com/libp2p/go-libp2p-core/peer" ) var ( @@ -119,36 +120,54 @@ func decodeTransactionHandshake(_ []byte) (Handshake, error) { return &transactionHandshake{}, nil } -func (s *Service) createBatchMessageHandler(txnBatch chan *BatchMessage) NotificationsMessageBatchHandler { - return func(peer peer.ID, msg NotificationsMessage) (msgs []*BatchMessage, err error) { +func (s *Service) createBatchMessageHandler(txnBatchCh chan *BatchMessage) NotificationsMessageBatchHandler { + go func() { + protocolID := s.host.protocolID + transactionsID + ticker := time.NewTicker(1 * time.Second) + + for { + out: + select { + case <-s.ctx.Done(): + return + case <-ticker.C: + innerTicker := time.NewTicker(300 * time.Millisecond) + for { + select { + case <-innerTicker.C: + break out + case txnMsg := <-txnBatchCh: + propagate, err := s.handleTransactionMessage(txnMsg.peer, txnMsg.msg) + if err != nil { + s.host.closeProtocolStream(protocolID, txnMsg.peer) + continue + } + + if s.noGossip || !propagate { + continue + } + + if !s.gossip.hasSeen(txnMsg.msg) { + s.broadcastExcluding(s.notificationsProtocols[TransactionMsgType], txnMsg.peer, txnMsg.msg) + } + } + } + } + } + }() + + return func(peer peer.ID, msg NotificationsMessage) { data := &BatchMessage{ msg: msg, peer: peer, } - txnBatch <- data - - if len(txnBatch) < s.batchSize { - return nil, nil - } - var propagateMsgs []*BatchMessage - for txnData := range txnBatch { - propagate, err := s.handleTransactionMessage(txnData.peer, txnData.msg) - if err != nil { - continue - } - if propagate { - propagateMsgs = append(propagateMsgs, &BatchMessage{ - msg: txnData.msg, - peer: txnData.peer, - }) - } - if len(txnBatch) == 0 { - break - } + select { + case txnBatchCh <- data: + case <-time.After(time.Millisecond * 200): + logger.Debug("transaction message not included into batch") + return } - // May be use error to compute peer score. - return propagateMsgs, nil } }