From accaf69ab639cad03f53db2d30bfcc754920479c Mon Sep 17 00:00:00 2001 From: Arijit Das Date: Tue, 13 Apr 2021 19:44:19 +0530 Subject: [PATCH] feat(dot/network): Add cache for network message. (#1511) * Add cache for network message. --- dot/network/host.go | 20 +++++++-- dot/network/host_test.go | 6 ++- dot/network/message_cache.go | 73 ++++++++++++++++++++++++++++++ dot/network/message_cache_test.go | 74 +++++++++++++++++++++++++++++++ dot/network/notifications.go | 24 ++++++---- dot/network/service_test.go | 69 ++++++++++++++++++++++++++++ dot/network/test_helpers.go | 7 +-- go.mod | 1 + 8 files changed, 259 insertions(+), 15 deletions(-) create mode 100644 dot/network/message_cache.go create mode 100644 dot/network/message_cache_test.go diff --git a/dot/network/host.go b/dot/network/host.go index 8b1e4d25da..12c4a386ba 100644 --- a/dot/network/host.go +++ b/dot/network/host.go @@ -23,6 +23,7 @@ import ( "path" "time" + "github.com/dgraph-io/ristretto" badger "github.com/ipfs/go-ds-badger2" "github.com/libp2p/go-libp2p" libp2phost "github.com/libp2p/go-libp2p-core/host" @@ -57,13 +58,11 @@ type host struct { protocolID protocol.ID cm *ConnManager ds *badger.Datastore + messageCache *messageCache } // newHost creates a host wrapper with a new libp2p host instance func newHost(ctx context.Context, cfg *Config) (*host, error) { - // use "p2p" for multiaddress format - ma.SwapToP2pMultiaddrs() - // create multiaddress (without p2p identity) addr, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", cfg.Port)) if err != nil { @@ -154,6 +153,20 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) { // wrap host and DHT service with routed host h = rhost.Wrap(h, dht) + cacheSize := 64 << 20 // 64 MB + config := ristretto.Config{ + NumCounters: int64(float64(cacheSize) * 0.05 * 2), + MaxCost: int64(float64(cacheSize) * 0.95), + BufferItems: 64, + Cost: func(value interface{}) int64 { + return int64(1) + }, + } + msgCache, err := newMessageCache(config, msgCacheTTL) + if err != nil { + return nil, err + } + host := &host{ ctx: ctx, h: h, @@ -163,6 +176,7 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) { cm: cm, ds: ds, persistentPeers: pps, + messageCache: msgCache, } cm.host = host diff --git a/dot/network/host_test.go b/dot/network/host_test.go index aca5f2313e..1bd3aec938 100644 --- a/dot/network/host_test.go +++ b/dot/network/host_test.go @@ -222,7 +222,11 @@ func TestSend(t *testing.T) { require.NoError(t, err) time.Sleep(TestMessageTimeout) - require.Equal(t, testBlockRequestMessage, handler.messages[nodeA.host.id()]) + + msg, ok := handler.messages[nodeA.host.id()] + require.True(t, ok) + require.Equal(t, 1, len(msg)) + require.Equal(t, testBlockRequestMessage, msg[0]) } // test host send method with existing stream diff --git a/dot/network/message_cache.go b/dot/network/message_cache.go new file mode 100644 index 0000000000..53565001a1 --- /dev/null +++ b/dot/network/message_cache.go @@ -0,0 +1,73 @@ +package network + +import ( + "errors" + "time" + + "github.com/ChainSafe/gossamer/lib/common" + "github.com/dgraph-io/ristretto" + "github.com/libp2p/go-libp2p-core/peer" +) + +// msgCacheTTL is default duration a key-value will be stored in messageCache. +var msgCacheTTL = 5 * time.Minute + +// messageCache is used to detect duplicated messages per peer. +type messageCache struct { + cache *ristretto.Cache + ttl time.Duration +} + +// newMessageCache creates a new messageCache which takes config and TTL duration. +func newMessageCache(config ristretto.Config, ttl time.Duration) (*messageCache, error) { + cache, err := ristretto.NewCache(&config) + if err != nil { + return nil, err + } + + if ttl == 0 { + ttl = msgCacheTTL + } + + return &messageCache{cache: cache, ttl: ttl}, nil +} + +// put appends peer ID and message data and stores it in cache with TTL. +func (m *messageCache) put(peer peer.ID, msg NotificationsMessage) (bool, error) { + key, err := generateCacheKey(peer, msg) + if err != nil { + return false, err + } + + _, ok := m.cache.Get(key) + if ok { + return false, nil + } + + ok = m.cache.SetWithTTL(key, "", 1, m.ttl) + return ok, nil +} + +// exists checks if exist in cache. +func (m *messageCache) exists(peer peer.ID, msg NotificationsMessage) bool { + key, err := generateCacheKey(peer, msg) + if err != nil { + return false + } + + _, ok := m.cache.Get(key) + return ok +} + +func generateCacheKey(peer peer.ID, msg NotificationsMessage) ([]byte, error) { + if msg.IsHandshake() { + return nil, errors.New("cache does not support handshake messages") + } + + peerMsgHash, err := common.Blake2bHash(append([]byte(peer), msg.Hash().ToBytes()...)) + if err != nil { + return nil, err + } + + return peerMsgHash.ToBytes(), nil +} diff --git a/dot/network/message_cache_test.go b/dot/network/message_cache_test.go new file mode 100644 index 0000000000..7acf50fce2 --- /dev/null +++ b/dot/network/message_cache_test.go @@ -0,0 +1,74 @@ +package network + +import ( + "math/big" + "testing" + "time" + + "github.com/ChainSafe/gossamer/dot/types" + "github.com/ChainSafe/gossamer/lib/common" + "github.com/dgraph-io/ristretto" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/stretchr/testify/require" +) + +func TestMessageCache(t *testing.T) { + cacheSize := 64 << 20 // 64 MB + msgCache, err := newMessageCache(ristretto.Config{ + NumCounters: int64(float64(cacheSize) * 0.05 * 2), + MaxCost: int64(float64(cacheSize) * 0.95), + BufferItems: 64, + Cost: func(value interface{}) int64 { + return int64(1) + }, + }, 800*time.Millisecond) + require.NoError(t, err) + + peerID := peer.ID("gossamer") + msg := &BlockAnnounceMessage{ + ParentHash: common.Hash{1}, + Number: big.NewInt(77), + StateRoot: common.Hash{2}, + ExtrinsicsRoot: common.Hash{3}, + Digest: types.Digest{}, + } + + ok, err := msgCache.put(peerID, msg) + require.NoError(t, err) + require.True(t, ok) + + time.Sleep(750 * time.Millisecond) + + ok = msgCache.exists(peerID, msg) + require.True(t, ok) + + time.Sleep(50 * time.Millisecond) + + ok = msgCache.exists(peerID, msg) + require.False(t, ok) +} + +func TestMessageCacheError(t *testing.T) { + cacheSize := 64 << 20 // 64 MB + msgCache, err := newMessageCache(ristretto.Config{ + NumCounters: int64(float64(cacheSize) * 0.05 * 2), + MaxCost: int64(float64(cacheSize) * 0.95), + BufferItems: 64, + Cost: func(value interface{}) int64 { + return int64(1) + }, + }, 800*time.Millisecond) + require.NoError(t, err) + + peerID := peer.ID("gossamer") + msg := &BlockAnnounceHandshake{ + Roles: 4, + BestBlockNumber: 77, + BestBlockHash: common.Hash{1}, + GenesisHash: common.Hash{2}, + } + + ok, err := msgCache.put(peerID, msg) + require.Error(t, err, "cache does not support handshake messages") + require.False(t, ok) +} diff --git a/dot/network/notifications.go b/dot/network/notifications.go index 98760d6e18..9d36695379 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -225,19 +225,14 @@ func (s *Service) broadcastExcluding(info *notificationsProtocol, excluding peer peers := s.host.peers() rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] }) - for i, peer := range peers { // TODO: check if stream is open, if not, open and send handshake - // TODO: configure this and determine ideal ratio, as well as when to use broadcast vs gossip - if i > len(peers)/3 { - return - } + info.mapMu.RLock() + defer info.mapMu.RUnlock() + for _, peer := range peers { // TODO: check if stream is open, if not, open and send handshake if peer == excluding { continue } - info.mapMu.RLock() - defer info.mapMu.RUnlock() - if hsData, has := info.getHandshakeData(peer); !has || !hsData.received { info.handshakeData.Store(peer, &handshakeData{ validated: false, @@ -247,6 +242,19 @@ func (s *Service) broadcastExcluding(info *notificationsProtocol, excluding peer logger.Trace("sending handshake", "protocol", info.protocolID, "peer", peer, "message", hs) err = s.host.send(peer, info.protocolID, hs) } else { + if s.host.messageCache != nil { + var added bool + added, err = s.host.messageCache.put(peer, msg) + if err != nil { + logger.Error("failed to add message to cache", "peer", peer, "error", err) + continue + } + + if !added { + continue + } + } + // we've already completed the handshake with the peer, send message directly logger.Trace("sending message", "protocol", info.protocolID, "peer", peer, "message", msg) err = s.host.send(peer, info.protocolID, msg) diff --git a/dot/network/service_test.go b/dot/network/service_test.go index 4f9743fc1f..a63b446398 100644 --- a/dot/network/service_test.go +++ b/dot/network/service_test.go @@ -161,6 +161,75 @@ func TestBroadcastMessages(t *testing.T) { require.NotNil(t, handler.messages[nodeA.host.id()]) } +func TestBroadcastDuplicateMessage(t *testing.T) { + msgCacheTTL = 2 * time.Second + + basePathA := utils.NewTestBasePath(t, "nodeA") + configA := &Config{ + BasePath: basePathA, + Port: 7001, + RandSeed: 1, + NoBootstrap: true, + NoMDNS: true, + } + + nodeA := createTestService(t, configA) + defer nodeA.Stop() + nodeA.noGossip = true + + basePathB := utils.NewTestBasePath(t, "nodeB") + configB := &Config{ + BasePath: basePathB, + Port: 7002, + RandSeed: 2, + NoBootstrap: true, + NoMDNS: true, + } + + nodeB := createTestService(t, configB) + defer nodeB.Stop() + nodeB.noGossip = true + + handler := newTestStreamHandler(testBlockAnnounceHandshakeDecoder) + nodeB.host.registerStreamHandler(blockAnnounceID, handler.handleStream) + + addrInfosB, err := nodeB.host.addrInfos() + require.NoError(t, err) + + protocol := nodeA.notificationsProtocols[BlockAnnounceMsgType] + protocol.handshakeData.Store(nodeB.host.id(), &handshakeData{ + received: true, + validated: true, + }) + + err = nodeA.host.connect(*addrInfosB[0]) + // retry connect if "failed to dial" error + if failedToDial(err) { + time.Sleep(TestBackoffTimeout) + err = nodeA.host.connect(*addrInfosB[0]) + } + require.NoError(t, err) + + // Only one message will be sent. + for i := 0; i < 5; i++ { + nodeA.SendMessage(testBlockAnnounceMessage) + } + + time.Sleep(time.Millisecond * 200) + require.Equal(t, 1, len(handler.messages[nodeA.host.id()])) + + nodeA.host.messageCache = nil + + // All 5 message will be sent since cache is disabled. + for i := 0; i < 5; i++ { + nodeA.SendMessage(testBlockAnnounceMessage) + require.NoError(t, err) + } + + time.Sleep(time.Millisecond * 200) + require.Equal(t, 6, len(handler.messages[nodeA.host.id()])) +} + func TestService_NodeRoles(t *testing.T) { basePath := utils.NewTestBasePath(t, "node") cfg := &Config{ diff --git a/dot/network/test_helpers.go b/dot/network/test_helpers.go index 03851ac325..7659867268 100644 --- a/dot/network/test_helpers.go +++ b/dot/network/test_helpers.go @@ -70,13 +70,13 @@ func (s *mockSyncer) SetSyncing(syncing bool) { } type testStreamHandler struct { - messages map[peer.ID]Message + messages map[peer.ID][]Message decoder messageDecoder } func newTestStreamHandler(decoder messageDecoder) *testStreamHandler { return &testStreamHandler{ - messages: make(map[peer.ID]Message), + messages: make(map[peer.ID][]Message), decoder: decoder, } } @@ -93,7 +93,8 @@ func (s *testStreamHandler) handleStream(stream libp2pnetwork.Stream) { } func (s *testStreamHandler) handleMessage(stream libp2pnetwork.Stream, msg Message) error { - s.messages[stream.Conn().RemotePeer()] = msg + msgs := s.messages[stream.Conn().RemotePeer()] + s.messages[stream.Conn().RemotePeer()] = append(msgs, msg) return nil } diff --git a/go.mod b/go.mod index e9ee6e8434..e16612d2e1 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/cosmos/go-bip39 v1.0.0 github.com/davidlazar/go-crypto v0.0.0-20190912175916-7055855a373f // indirect github.com/dgraph-io/badger/v2 v2.2007.2 // indirect + github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de github.com/disiqueira/gotree v1.0.0 github.com/docker/docker v1.13.1 github.com/elastic/gosigar v0.14.0 // indirect