Skip to content

Commit

Permalink
Implement time based batch processing of transcation message.
Browse files Browse the repository at this point in the history
  • Loading branch information
arijitAD committed Nov 2, 2021
1 parent becec9e commit 6e5e885
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 93 deletions.
5 changes: 5 additions & 0 deletions dot/network/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ const (

// DefaultDiscoveryInterval is the default interval for searching for DHT peers
DefaultDiscoveryInterval = time.Minute * 5

// defaultTxnBatchSize is the default size for the transaction batch
defaultTxnBatchSize = 100
)

// DefaultBootnodes the default value for Config.Bootnodes
Expand Down Expand Up @@ -104,6 +107,8 @@ type Config struct {
telemetryInterval time.Duration

noPreAllocate bool // internal option

batchSize int // internal option
}

// build checks the configuration, sets up the private key for the network service,
Expand Down
3 changes: 2 additions & 1 deletion dot/network/connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import (
"testing"
"time"

"github.com/ChainSafe/gossamer/lib/utils"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/require"

"github.com/ChainSafe/gossamer/lib/utils"
)

func TestMaxPeers(t *testing.T) {
Expand Down
14 changes: 14 additions & 0 deletions dot/network/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,3 +396,17 @@ func (h *host) multiaddrs() (multiaddrs []ma.Multiaddr) {
func (h *host) protocols() []string {
return h.h.Mux().Protocols()
}

func (h *host) closeProtocolStream(pID protocol.ID, p peer.ID) {
connToPeer := h.h.Network().ConnsToPeer(p)
for _, c := range connToPeer {
for _, st := range c.GetStreams() {
if st.Protocol() == pID {
err := st.Close()
if err != nil {
logger.Trace("Failed to close stream", "protocol", pID, "error", err)
}
}
}
}
}
36 changes: 10 additions & 26 deletions dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type (
NotificationsMessageHandler = func(peer peer.ID, msg NotificationsMessage) (propagate bool, err error)

// NotificationsMessageBatchHandler is called when a (non-handshake) message is received over a notifications stream in batch processing mode.
NotificationsMessageBatchHandler = func(peer peer.ID, msg NotificationsMessage) (batchMsgs []*BatchMessage, err error)
NotificationsMessageBatchHandler = func(peer peer.ID, msg NotificationsMessage)
)

// BatchMessage is exported for the mocks of lib/grandpa/mocks/network.go
Expand Down Expand Up @@ -221,40 +221,24 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol,
"peer", stream.Conn().RemotePeer(),
)

var (
propagate bool
err error
msgs []*BatchMessage
)
if batchHandler != nil {
msgs, err = batchHandler(peer, msg)
if err != nil {
return err
}
batchHandler(peer, msg)
return nil
}

propagate = len(msgs) > 0
} else {
propagate, err = messageHandler(peer, msg)
if err != nil {
return err
}
msgs = append(msgs, &BatchMessage{
msg: msg,
peer: peer,
})
propagate, err := messageHandler(peer, msg)
if err != nil {
return err
}

if !propagate || s.noGossip {
return nil
}

for _, data := range msgs {
seen := s.gossip.hasSeen(data.msg)
if !seen {
s.broadcastExcluding(info, data.peer, data.msg)
}
seen := s.gossip.hasSeen(msg)
if !seen {
s.broadcastExcluding(info, peer, msg)
}

return nil
}
}
Expand Down
66 changes: 36 additions & 30 deletions dot/network/notifications_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import (
"time"
"unsafe"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/utils"
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/utils"
)

func TestHandshake_SizeOf(t *testing.T) {
Expand Down Expand Up @@ -320,20 +320,17 @@ func Test_HandshakeTimeout(t *testing.T) {
}

func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) {
const batchSize = 5
basePath := utils.NewTestBasePath(t, "nodeA")
mockhandler := &MockTransactionHandler{}
mockhandler.On("HandleTransactionMessage", mock.AnythingOfType("*network.TransactionMessage")).Return(true, nil)
mockhandler.On("TransactionsCount").Return(0)
config := &Config{
BasePath: basePath,
Port: 7001,
NoBootstrap: true,
NoMDNS: true,
TransactionHandler: mockhandler,
BasePath: basePath,
Port: 7001,
NoBootstrap: true,
NoMDNS: true,
batchSize: batchSize,
}

s := createTestService(t, config)
s.batchSize = 5
srvc1 := createTestService(t, config)

configB := &Config{
BasePath: utils.NewTestBasePath(t, "nodeB"),
Expand All @@ -342,42 +339,41 @@ func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) {
NoMDNS: true,
}

b := createTestService(t, configB)
srvc2 := createTestService(t, configB)

txnBatch := make(chan *BatchMessage, s.batchSize)
txnBatchHandler := s.createBatchMessageHandler(txnBatch)

// don't set handshake data ie. this stream has just been opened
testPeerID := b.host.id()
txnBatch := make(chan *BatchMessage, batchSize)
txnBatchHandler := srvc1.createBatchMessageHandler(txnBatch)

// connect nodes
addrInfoB := b.host.addrInfo()
err := s.host.connect(addrInfoB)
addrInfoB := srvc2.host.addrInfo()
err := srvc1.host.connect(addrInfoB)
if failedToDial(err) {
time.Sleep(TestBackoffTimeout)
err = s.host.connect(addrInfoB)
err = srvc1.host.connect(addrInfoB)
require.NoError(t, err)
}
require.NoError(t, err)

stream, err := s.host.h.NewStream(s.ctx, b.host.id(), s.host.protocolID+transactionsID)
txnProtocolID := srvc1.host.protocolID + transactionsID
stream, err := srvc1.host.h.NewStream(srvc1.ctx, srvc2.host.id(), txnProtocolID)
require.NoError(t, err)
require.Len(t, txnBatch, 0)

// create info and handler
info := &notificationsProtocol{
protocolID: s.host.protocolID + transactionsID,
getHandshake: s.getTransactionHandshake,
protocolID: txnProtocolID,
getHandshake: srvc1.getTransactionHandshake,
handshakeValidator: validateTransactionHandshake,
inboundHandshakeData: new(sync.Map),
outboundHandshakeData: new(sync.Map),
}
handler := s.createNotificationsMessageHandler(info, s.handleTransactionMessage, txnBatchHandler)
handler := srvc1.createNotificationsMessageHandler(info, srvc1.handleTransactionMessage, txnBatchHandler)

// set handshake data to received
info.inboundHandshakeData.Store(testPeerID, handshakeData{
info.inboundHandshakeData.Store(srvc2.host.id(), handshakeData{
received: true,
validated: true,
})

msg := &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
}
Expand Down Expand Up @@ -411,11 +407,21 @@ func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) {
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 0)
require.Len(t, txnBatch, 5)

// reached batch size limit, below transaction will not be included in batch.
msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 5)

msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
}
// wait for transaction batch channel to process.
time.Sleep(1300 * time.Millisecond)
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 1)
Expand Down
17 changes: 9 additions & 8 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@ import (
"sync"
"time"

gssmrmetrics "github.com/ChainSafe/gossamer/dot/metrics"
"github.com/ChainSafe/gossamer/dot/telemetry"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/services"
log "github.com/ChainSafe/log15"
"github.com/ethereum/go-ethereum/metrics"
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"

gssmrmetrics "github.com/ChainSafe/gossamer/dot/metrics"
"github.com/ChainSafe/gossamer/dot/telemetry"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/services"
)

const (
Expand Down Expand Up @@ -100,8 +101,6 @@ type Service struct {

blockResponseBuf []byte
blockResponseBufMu sync.Mutex

batchSize int
}

// NewService creates a new network service from the configuration and message channels
Expand Down Expand Up @@ -138,6 +137,9 @@ func NewService(cfg *Config) (*Service, error) {
connectToPeersTimeout = cfg.DiscoveryInterval
}

if cfg.batchSize == 0 {
cfg.batchSize = defaultTxnBatchSize
}
// create a new host instance
host, err := newHost(ctx, cfg)
if err != nil {
Expand Down Expand Up @@ -176,7 +178,6 @@ func NewService(cfg *Config) (*Service, error) {
bufPool: bufPool,
streamManager: newStreamManager(ctx),
blockResponseBuf: make([]byte, maxBlockResponseSize),
batchSize: 100,
}

return network, err
Expand Down Expand Up @@ -224,7 +225,7 @@ func (s *Service) Start() error {
logger.Warn("failed to register notifications protocol", "sub-protocol", blockAnnounceID, "error", err)
}

txnBatch := make(chan *BatchMessage, s.batchSize)
txnBatch := make(chan *BatchMessage, s.cfg.batchSize)
txnBatchHandler := s.createBatchMessageHandler(txnBatch)

// register transactions protocol
Expand Down
7 changes: 5 additions & 2 deletions dot/network/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ import (
"testing"
"time"

"github.com/ChainSafe/gossamer/lib/utils"
mock "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/ChainSafe/gossamer/lib/utils"
)

var TestProtocolID = "/gossamer/test/0"
Expand Down Expand Up @@ -65,6 +66,8 @@ func createServiceHelper(t *testing.T, num int) []*Service {

// helper method to create and start a new network service
func createTestService(t *testing.T, cfg *Config) (srvc *Service) {
t.Helper()

if cfg == nil {
basePath := utils.NewTestBasePath(t, "node")

Expand All @@ -83,7 +86,7 @@ func createTestService(t *testing.T, cfg *Config) (srvc *Service) {

if cfg.TransactionHandler == nil {
mocktxhandler := &MockTransactionHandler{}
mocktxhandler.On("HandleTransactionMessage", mock.AnythingOfType("*TransactionMessage")).Return(nil)
mocktxhandler.On("HandleTransactionMessage", mock.AnythingOfType("*network.TransactionMessage")).Return(true, nil)
mocktxhandler.On("TransactionsCount").Return(0)
cfg.TransactionHandler = mocktxhandler
}
Expand Down
Loading

0 comments on commit 6e5e885

Please sign in to comment.