Skip to content

Commit

Permalink
feat(dot/network): Add cache for network message. (#1511)
Browse files Browse the repository at this point in the history
* Add cache for network message.
  • Loading branch information
arijitAD authored Apr 13, 2021
1 parent 271f92e commit accaf69
Show file tree
Hide file tree
Showing 8 changed files with 259 additions and 15 deletions.
20 changes: 17 additions & 3 deletions dot/network/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"path"
"time"

"github.com/dgraph-io/ristretto"
badger "github.com/ipfs/go-ds-badger2"
"github.com/libp2p/go-libp2p"
libp2phost "github.com/libp2p/go-libp2p-core/host"
Expand Down Expand Up @@ -57,13 +58,11 @@ type host struct {
protocolID protocol.ID
cm *ConnManager
ds *badger.Datastore
messageCache *messageCache
}

// newHost creates a host wrapper with a new libp2p host instance
func newHost(ctx context.Context, cfg *Config) (*host, error) {
// use "p2p" for multiaddress format
ma.SwapToP2pMultiaddrs()

// create multiaddress (without p2p identity)
addr, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", cfg.Port))
if err != nil {
Expand Down Expand Up @@ -154,6 +153,20 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
// wrap host and DHT service with routed host
h = rhost.Wrap(h, dht)

cacheSize := 64 << 20 // 64 MB
config := ristretto.Config{
NumCounters: int64(float64(cacheSize) * 0.05 * 2),
MaxCost: int64(float64(cacheSize) * 0.95),
BufferItems: 64,
Cost: func(value interface{}) int64 {
return int64(1)
},
}
msgCache, err := newMessageCache(config, msgCacheTTL)
if err != nil {
return nil, err
}

host := &host{
ctx: ctx,
h: h,
Expand All @@ -163,6 +176,7 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
cm: cm,
ds: ds,
persistentPeers: pps,
messageCache: msgCache,
}

cm.host = host
Expand Down
6 changes: 5 additions & 1 deletion dot/network/host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,11 @@ func TestSend(t *testing.T) {
require.NoError(t, err)

time.Sleep(TestMessageTimeout)
require.Equal(t, testBlockRequestMessage, handler.messages[nodeA.host.id()])

msg, ok := handler.messages[nodeA.host.id()]
require.True(t, ok)
require.Equal(t, 1, len(msg))
require.Equal(t, testBlockRequestMessage, msg[0])
}

// test host send method with existing stream
Expand Down
73 changes: 73 additions & 0 deletions dot/network/message_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package network

import (
"errors"
"time"

"github.com/ChainSafe/gossamer/lib/common"
"github.com/dgraph-io/ristretto"
"github.com/libp2p/go-libp2p-core/peer"
)

// msgCacheTTL is default duration a key-value will be stored in messageCache.
var msgCacheTTL = 5 * time.Minute

// messageCache is used to detect duplicated messages per peer.
type messageCache struct {
cache *ristretto.Cache
ttl time.Duration
}

// newMessageCache creates a new messageCache which takes config and TTL duration.
func newMessageCache(config ristretto.Config, ttl time.Duration) (*messageCache, error) {
cache, err := ristretto.NewCache(&config)
if err != nil {
return nil, err
}

if ttl == 0 {
ttl = msgCacheTTL
}

return &messageCache{cache: cache, ttl: ttl}, nil
}

// put appends peer ID and message data and stores it in cache with TTL.
func (m *messageCache) put(peer peer.ID, msg NotificationsMessage) (bool, error) {
key, err := generateCacheKey(peer, msg)
if err != nil {
return false, err
}

_, ok := m.cache.Get(key)
if ok {
return false, nil
}

ok = m.cache.SetWithTTL(key, "", 1, m.ttl)
return ok, nil
}

// exists checks if <peer ID, message> exist in cache.
func (m *messageCache) exists(peer peer.ID, msg NotificationsMessage) bool {
key, err := generateCacheKey(peer, msg)
if err != nil {
return false
}

_, ok := m.cache.Get(key)
return ok
}

func generateCacheKey(peer peer.ID, msg NotificationsMessage) ([]byte, error) {
if msg.IsHandshake() {
return nil, errors.New("cache does not support handshake messages")
}

peerMsgHash, err := common.Blake2bHash(append([]byte(peer), msg.Hash().ToBytes()...))
if err != nil {
return nil, err
}

return peerMsgHash.ToBytes(), nil
}
74 changes: 74 additions & 0 deletions dot/network/message_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package network

import (
"math/big"
"testing"
"time"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/dgraph-io/ristretto"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/require"
)

func TestMessageCache(t *testing.T) {
cacheSize := 64 << 20 // 64 MB
msgCache, err := newMessageCache(ristretto.Config{
NumCounters: int64(float64(cacheSize) * 0.05 * 2),
MaxCost: int64(float64(cacheSize) * 0.95),
BufferItems: 64,
Cost: func(value interface{}) int64 {
return int64(1)
},
}, 800*time.Millisecond)
require.NoError(t, err)

peerID := peer.ID("gossamer")
msg := &BlockAnnounceMessage{
ParentHash: common.Hash{1},
Number: big.NewInt(77),
StateRoot: common.Hash{2},
ExtrinsicsRoot: common.Hash{3},
Digest: types.Digest{},
}

ok, err := msgCache.put(peerID, msg)
require.NoError(t, err)
require.True(t, ok)

time.Sleep(750 * time.Millisecond)

ok = msgCache.exists(peerID, msg)
require.True(t, ok)

time.Sleep(50 * time.Millisecond)

ok = msgCache.exists(peerID, msg)
require.False(t, ok)
}

func TestMessageCacheError(t *testing.T) {
cacheSize := 64 << 20 // 64 MB
msgCache, err := newMessageCache(ristretto.Config{
NumCounters: int64(float64(cacheSize) * 0.05 * 2),
MaxCost: int64(float64(cacheSize) * 0.95),
BufferItems: 64,
Cost: func(value interface{}) int64 {
return int64(1)
},
}, 800*time.Millisecond)
require.NoError(t, err)

peerID := peer.ID("gossamer")
msg := &BlockAnnounceHandshake{
Roles: 4,
BestBlockNumber: 77,
BestBlockHash: common.Hash{1},
GenesisHash: common.Hash{2},
}

ok, err := msgCache.put(peerID, msg)
require.Error(t, err, "cache does not support handshake messages")
require.False(t, ok)
}
24 changes: 16 additions & 8 deletions dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,19 +225,14 @@ func (s *Service) broadcastExcluding(info *notificationsProtocol, excluding peer
peers := s.host.peers()
rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] })

for i, peer := range peers { // TODO: check if stream is open, if not, open and send handshake
// TODO: configure this and determine ideal ratio, as well as when to use broadcast vs gossip
if i > len(peers)/3 {
return
}
info.mapMu.RLock()
defer info.mapMu.RUnlock()

for _, peer := range peers { // TODO: check if stream is open, if not, open and send handshake
if peer == excluding {
continue
}

info.mapMu.RLock()
defer info.mapMu.RUnlock()

if hsData, has := info.getHandshakeData(peer); !has || !hsData.received {
info.handshakeData.Store(peer, &handshakeData{
validated: false,
Expand All @@ -247,6 +242,19 @@ func (s *Service) broadcastExcluding(info *notificationsProtocol, excluding peer
logger.Trace("sending handshake", "protocol", info.protocolID, "peer", peer, "message", hs)
err = s.host.send(peer, info.protocolID, hs)
} else {
if s.host.messageCache != nil {
var added bool
added, err = s.host.messageCache.put(peer, msg)
if err != nil {
logger.Error("failed to add message to cache", "peer", peer, "error", err)
continue
}

if !added {
continue
}
}

// we've already completed the handshake with the peer, send message directly
logger.Trace("sending message", "protocol", info.protocolID, "peer", peer, "message", msg)
err = s.host.send(peer, info.protocolID, msg)
Expand Down
69 changes: 69 additions & 0 deletions dot/network/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,75 @@ func TestBroadcastMessages(t *testing.T) {
require.NotNil(t, handler.messages[nodeA.host.id()])
}

func TestBroadcastDuplicateMessage(t *testing.T) {
msgCacheTTL = 2 * time.Second

basePathA := utils.NewTestBasePath(t, "nodeA")
configA := &Config{
BasePath: basePathA,
Port: 7001,
RandSeed: 1,
NoBootstrap: true,
NoMDNS: true,
}

nodeA := createTestService(t, configA)
defer nodeA.Stop()
nodeA.noGossip = true

basePathB := utils.NewTestBasePath(t, "nodeB")
configB := &Config{
BasePath: basePathB,
Port: 7002,
RandSeed: 2,
NoBootstrap: true,
NoMDNS: true,
}

nodeB := createTestService(t, configB)
defer nodeB.Stop()
nodeB.noGossip = true

handler := newTestStreamHandler(testBlockAnnounceHandshakeDecoder)
nodeB.host.registerStreamHandler(blockAnnounceID, handler.handleStream)

addrInfosB, err := nodeB.host.addrInfos()
require.NoError(t, err)

protocol := nodeA.notificationsProtocols[BlockAnnounceMsgType]
protocol.handshakeData.Store(nodeB.host.id(), &handshakeData{
received: true,
validated: true,
})

err = nodeA.host.connect(*addrInfosB[0])
// retry connect if "failed to dial" error
if failedToDial(err) {
time.Sleep(TestBackoffTimeout)
err = nodeA.host.connect(*addrInfosB[0])
}
require.NoError(t, err)

// Only one message will be sent.
for i := 0; i < 5; i++ {
nodeA.SendMessage(testBlockAnnounceMessage)
}

time.Sleep(time.Millisecond * 200)
require.Equal(t, 1, len(handler.messages[nodeA.host.id()]))

nodeA.host.messageCache = nil

// All 5 message will be sent since cache is disabled.
for i := 0; i < 5; i++ {
nodeA.SendMessage(testBlockAnnounceMessage)
require.NoError(t, err)
}

time.Sleep(time.Millisecond * 200)
require.Equal(t, 6, len(handler.messages[nodeA.host.id()]))
}

func TestService_NodeRoles(t *testing.T) {
basePath := utils.NewTestBasePath(t, "node")
cfg := &Config{
Expand Down
7 changes: 4 additions & 3 deletions dot/network/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ func (s *mockSyncer) SetSyncing(syncing bool) {
}

type testStreamHandler struct {
messages map[peer.ID]Message
messages map[peer.ID][]Message
decoder messageDecoder
}

func newTestStreamHandler(decoder messageDecoder) *testStreamHandler {
return &testStreamHandler{
messages: make(map[peer.ID]Message),
messages: make(map[peer.ID][]Message),
decoder: decoder,
}
}
Expand All @@ -93,7 +93,8 @@ func (s *testStreamHandler) handleStream(stream libp2pnetwork.Stream) {
}

func (s *testStreamHandler) handleMessage(stream libp2pnetwork.Stream, msg Message) error {
s.messages[stream.Conn().RemotePeer()] = msg
msgs := s.messages[stream.Conn().RemotePeer()]
s.messages[stream.Conn().RemotePeer()] = append(msgs, msg)
return nil
}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/cosmos/go-bip39 v1.0.0
github.com/davidlazar/go-crypto v0.0.0-20190912175916-7055855a373f // indirect
github.com/dgraph-io/badger/v2 v2.2007.2 // indirect
github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de
github.com/disiqueira/gotree v1.0.0
github.com/docker/docker v1.13.1
github.com/elastic/gosigar v0.14.0 // indirect
Expand Down

0 comments on commit accaf69

Please sign in to comment.