Skip to content

Commit

Permalink
fix(dot/network): Implement time based handle transaction (ChainSafe#…
Browse files Browse the repository at this point in the history
…1942)

* Implement time-based batch processing of transaction messages.
  • Loading branch information
arijitAD authored and timwu20 committed Dec 6, 2021
1 parent 6c095e4 commit 72455d1
Show file tree
Hide file tree
Showing 15 changed files with 177 additions and 122 deletions.
10 changes: 9 additions & 1 deletion dot/network/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import (
"path"
"time"

"github.com/ChainSafe/gossamer/internal/log"
"github.com/libp2p/go-libp2p-core/crypto"

"github.com/ChainSafe/gossamer/internal/log"
)

const (
Expand Down Expand Up @@ -39,6 +40,8 @@ const (

// DefaultDiscoveryInterval is the default interval for searching for DHT peers
DefaultDiscoveryInterval = time.Minute * 5

defaultTxnBatchSize = 100
)

// DefaultBootnodes the default value for Config.Bootnodes
Expand Down Expand Up @@ -93,6 +96,11 @@ type Config struct {
telemetryInterval time.Duration

noPreAllocate bool // internal option

batchSize int // internal option

// SlotDuration is the slot duration to produce a block
SlotDuration time.Duration
}

// build checks the configuration, sets up the private key for the network service,
Expand Down
4 changes: 2 additions & 2 deletions dot/network/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import (
"testing"
"time"

"github.com/ChainSafe/gossamer/lib/utils"
badger "github.com/ipfs/go-ds-badger2"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing"

"github.com/stretchr/testify/require"

"github.com/ChainSafe/gossamer/lib/utils"
)

func newTestDiscovery(t *testing.T, num int) []*discovery {
Expand Down
15 changes: 15 additions & 0 deletions dot/network/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,3 +394,18 @@ func (h *host) protocols() []string {
func (h *host) closePeer(peer peer.ID) error {
return h.h.Network().ClosePeer(peer)
}

func (h *host) closeProtocolStream(pID protocol.ID, p peer.ID) {
connToPeer := h.h.Network().ConnsToPeer(p)
for _, c := range connToPeer {
for _, st := range c.GetStreams() {
if st.Protocol() != pID {
continue
}
err := st.Close()
if err != nil {
logger.Tracef("Failed to close stream for protocol %s: %s", pID, err)
}
}
}
}
56 changes: 19 additions & 37 deletions dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import (
"sync"
"time"

"github.com/ChainSafe/gossamer/dot/peerset"

"github.com/libp2p/go-libp2p-core/mux"
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"

"github.com/ChainSafe/gossamer/dot/peerset"
)

const handshakeTimeout = time.Second * 10
Expand All @@ -42,10 +42,9 @@ type (
// NotificationsMessageHandler is called when a (non-handshake) message is received over a notifications stream.
NotificationsMessageHandler = func(peer peer.ID, msg NotificationsMessage) (propagate bool, err error)

// NotificationsMessageBatchHandler is called when a (non-handshake)
// message is received over a notifications stream in batch processing mode.
NotificationsMessageBatchHandler = func(peer peer.ID, msg NotificationsMessage) (
batchMsgs []*BatchMessage, err error)
// NotificationsMessageBatchHandler is called when a (non-handshake) message is received over a notifications
// stream in batch processing mode.
NotificationsMessageBatchHandler = func(peer peer.ID, msg NotificationsMessage)
)

// BatchMessage is exported for the mocks of lib/grandpa/mocks/network.go
Expand Down Expand Up @@ -223,47 +222,30 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol,
logger.Tracef("received message on notifications sub-protocol %s from peer %s, message is: %s",
info.protocolID, stream.Conn().RemotePeer(), msg)

var (
propagate bool
err error
msgs []*BatchMessage
)
if batchHandler != nil {
msgs, err = batchHandler(peer, msg)
if err != nil {
return err
}

propagate = len(msgs) > 0
} else {
propagate, err = messageHandler(peer, msg)
if err != nil {
return err
}
batchHandler(peer, msg)
return nil
}

msgs = append(msgs, &BatchMessage{
msg: msg,
peer: peer,
})
propagate, err := messageHandler(peer, msg)
if err != nil {
return err
}

if !propagate || s.noGossip {
return nil
}

for _, data := range msgs {
seen := s.gossip.hasSeen(data.msg)
if !seen {
s.broadcastExcluding(info, data.peer, data.msg)
}

// report peer if we get duplicate gossip message.
s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
Value: peerset.DuplicateGossipValue,
Reason: peerset.DuplicateGossipReason,
}, peer)
if !s.gossip.hasSeen(msg) {
s.broadcastExcluding(info, peer, msg)
return nil
}

// report peer if we get duplicate gossip message.
s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
Value: peerset.DuplicateGossipValue,
Reason: peerset.DuplicateGossipReason,
}, peer)
return nil
}
}
Expand Down
69 changes: 36 additions & 33 deletions dot/network/notifications_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ import (
"time"
"unsafe"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/utils"
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/utils"
)

func TestCreateDecoder_BlockAnnounce(t *testing.T) {
Expand Down Expand Up @@ -302,23 +302,17 @@ func Test_HandshakeTimeout(t *testing.T) {
}

func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) {
const batchSize = 5
basePath := utils.NewTestBasePath(t, "nodeA")
mockhandler := &MockTransactionHandler{}
mockhandler.On("HandleTransactionMessage",
mock.AnythingOfType("peer.ID"),
mock.AnythingOfType("*network.TransactionMessage")).
Return(true, nil)
mockhandler.On("TransactionsCount").Return(0)
config := &Config{
BasePath: basePath,
Port: 7001,
NoBootstrap: true,
NoMDNS: true,
TransactionHandler: mockhandler,
BasePath: basePath,
Port: 7001,
NoBootstrap: true,
NoMDNS: true,
batchSize: batchSize,
}

s := createTestService(t, config)
s.batchSize = 5
srvc1 := createTestService(t, config)

configB := &Config{
BasePath: utils.NewTestBasePath(t, "nodeB"),
Expand All @@ -327,42 +321,41 @@ func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) {
NoMDNS: true,
}

b := createTestService(t, configB)
srvc2 := createTestService(t, configB)

txnBatch := make(chan *BatchMessage, s.batchSize)
txnBatchHandler := s.createBatchMessageHandler(txnBatch)

// don't set handshake data ie. this stream has just been opened
testPeerID := b.host.id()
txnBatch := make(chan *BatchMessage, batchSize)
txnBatchHandler := srvc1.createBatchMessageHandler(txnBatch)

// connect nodes
addrInfoB := b.host.addrInfo()
err := s.host.connect(addrInfoB)
addrInfoB := srvc2.host.addrInfo()
err := srvc1.host.connect(addrInfoB)
if failedToDial(err) {
time.Sleep(TestBackoffTimeout)
err = s.host.connect(addrInfoB)
err = srvc1.host.connect(addrInfoB)
require.NoError(t, err)
}
require.NoError(t, err)

stream, err := s.host.h.NewStream(s.ctx, b.host.id(), s.host.protocolID+transactionsID)
txnProtocolID := srvc1.host.protocolID + transactionsID
stream, err := srvc1.host.h.NewStream(srvc1.ctx, srvc2.host.id(), txnProtocolID)
require.NoError(t, err)
require.Len(t, txnBatch, 0)

// create info and handler
info := &notificationsProtocol{
protocolID: s.host.protocolID + transactionsID,
getHandshake: s.getTransactionHandshake,
protocolID: txnProtocolID,
getHandshake: srvc1.getTransactionHandshake,
handshakeValidator: validateTransactionHandshake,
inboundHandshakeData: new(sync.Map),
outboundHandshakeData: new(sync.Map),
}
handler := s.createNotificationsMessageHandler(info, s.handleTransactionMessage, txnBatchHandler)
handler := srvc1.createNotificationsMessageHandler(info, srvc1.handleTransactionMessage, txnBatchHandler)

// set handshake data to received
info.inboundHandshakeData.Store(testPeerID, &handshakeData{
info.inboundHandshakeData.Store(srvc2.host.id(), handshakeData{
received: true,
validated: true,
})

msg := &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
}
Expand Down Expand Up @@ -396,11 +389,21 @@ func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) {
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 0)
require.Len(t, txnBatch, 5)

// reached batch size limit, below transaction will not be included in batch.
msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 5)

msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
}
// wait for transaction batch channel to process.
time.Sleep(1300 * time.Millisecond)
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 1)
Expand Down
17 changes: 9 additions & 8 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,17 @@ import (
"sync"
"time"

"github.com/ethereum/go-ethereum/metrics"
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"

gssmrmetrics "github.com/ChainSafe/gossamer/dot/metrics"
"github.com/ChainSafe/gossamer/dot/peerset"
"github.com/ChainSafe/gossamer/dot/telemetry"
"github.com/ChainSafe/gossamer/internal/log"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/services"
"github.com/ethereum/go-ethereum/metrics"
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
)

const (
Expand Down Expand Up @@ -89,8 +90,6 @@ type Service struct {

blockResponseBuf []byte
blockResponseBufMu sync.Mutex

batchSize int
}

// NewService creates a new network service from the configuration and message channels
Expand Down Expand Up @@ -125,6 +124,9 @@ func NewService(cfg *Config) (*Service, error) {
connectToPeersTimeout = cfg.DiscoveryInterval
}

if cfg.batchSize == 0 {
cfg.batchSize = defaultTxnBatchSize
}
// create a new host instance
host, err := newHost(ctx, cfg)
if err != nil {
Expand Down Expand Up @@ -162,7 +164,6 @@ func NewService(cfg *Config) (*Service, error) {
bufPool: bufPool,
streamManager: newStreamManager(ctx),
blockResponseBuf: make([]byte, maxBlockResponseSize),
batchSize: 100,
}

return network, err
Expand Down Expand Up @@ -211,7 +212,7 @@ func (s *Service) Start() error {
blockAnnounceID, err)
}

txnBatch := make(chan *BatchMessage, s.batchSize)
txnBatch := make(chan *BatchMessage, s.cfg.batchSize)
txnBatchHandler := s.createBatchMessageHandler(txnBatch)

// register transactions protocol
Expand Down
8 changes: 6 additions & 2 deletions dot/network/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ func createServiceHelper(t *testing.T, num int) []*Service {

// helper method to create and start a new network service
func createTestService(t *testing.T, cfg *Config) (srvc *Service) {
t.Helper()

if cfg == nil {
basePath := utils.NewTestBasePath(t, "node")

Expand All @@ -73,12 +75,14 @@ func createTestService(t *testing.T, cfg *Config) (srvc *Service) {
mocktxhandler := &MockTransactionHandler{}
mocktxhandler.On("HandleTransactionMessage",
mock.AnythingOfType("peer.ID"),
mock.AnythingOfType("*TransactionMessage")).
Return(nil)
mock.AnythingOfType("*network.TransactionMessage")).
Return(true, nil)
mocktxhandler.On("TransactionsCount").Return(0)
cfg.TransactionHandler = mocktxhandler
}

cfg.SlotDuration = time.Second

cfg.ProtocolID = TestProtocolID // default "/gossamer/gssmr/0"

if cfg.LogLvl == 0 {
Expand Down
5 changes: 3 additions & 2 deletions dot/network/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import (
"io"
"math/big"

"github.com/stretchr/testify/mock"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/common/variadic"
"github.com/stretchr/testify/mock"

libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -75,7 +76,7 @@ func NewMockTransactionHandler() *MockTransactionHandler {
mocktxhandler.On("HandleTransactionMessage",
mock.AnythingOfType("peer.ID"),
mock.AnythingOfType("*network.TransactionMessage")).
Return(nil)
Return(true, nil)
mocktxhandler.On("TransactionsCount").Return(0)
return mocktxhandler
}
Expand Down
Loading

0 comments on commit 72455d1

Please sign in to comment.