Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dot/network): Add cache for network message. #1511

Merged
merged 7 commits into from
Apr 13, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions dot/network/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"path"
"time"

"github.com/dgraph-io/ristretto"
badger "github.com/ipfs/go-ds-badger2"
"github.com/libp2p/go-libp2p"
libp2phost "github.com/libp2p/go-libp2p-core/host"
Expand Down Expand Up @@ -57,13 +58,11 @@ type host struct {
protocolID protocol.ID
cm *ConnManager
ds *badger.Datastore
messageCache *messageCache
}

// newHost creates a host wrapper with a new libp2p host instance
func newHost(ctx context.Context, cfg *Config) (*host, error) {
// use "p2p" for multiaddress format
ma.SwapToP2pMultiaddrs()

// create multiaddress (without p2p identity)
addr, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", cfg.Port))
if err != nil {
Expand Down Expand Up @@ -154,6 +153,20 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
// wrap host and DHT service with routed host
h = rhost.Wrap(h, dht)

cacheSize := 64 << 20 // 64 MB
config := ristretto.Config{
NumCounters: int64(float64(cacheSize) * 0.05 * 2),
MaxCost: int64(float64(cacheSize) * 0.95),
BufferItems: 64,
noot marked this conversation as resolved.
Show resolved Hide resolved
Cost: func(value interface{}) int64 {
return int64(1)
},
}
msgCache, err := newMessageCache(config, msgCacheTTL)
if err != nil {
return nil, err
}

host := &host{
ctx: ctx,
h: h,
Expand All @@ -163,6 +176,7 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
cm: cm,
ds: ds,
persistentPeers: pps,
messageCache: msgCache,
}

cm.host = host
Expand Down
6 changes: 5 additions & 1 deletion dot/network/host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,11 @@ func TestSend(t *testing.T) {
require.NoError(t, err)

time.Sleep(TestMessageTimeout)
require.Equal(t, testBlockRequestMessage, handler.messages[nodeA.host.id()])

msg, ok := handler.messages[nodeA.host.id()]
require.True(t, ok)
require.Equal(t, 1, len(msg))
require.Equal(t, testBlockRequestMessage, msg[0])
}

// test host send method with existing stream
Expand Down
68 changes: 68 additions & 0 deletions dot/network/message_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package network

import (
"time"

"github.com/ChainSafe/gossamer/lib/common"
"github.com/dgraph-io/ristretto"
"github.com/libp2p/go-libp2p-core/peer"
)

// msgCacheTTL is default duration a key-value will be stored in messageCache.
var msgCacheTTL = 5 * time.Minute

// messageCache is used to detect duplicated messages per peer.
type messageCache struct {
cache *ristretto.Cache
ttl time.Duration
}

// newMessageCache creates a new messageCache which takes config and TTL duration.
func newMessageCache(config ristretto.Config, ttl time.Duration) (*messageCache, error) {
cache, err := ristretto.NewCache(&config)
if err != nil {
return nil, err
}

if ttl == 0 {
ttl = msgCacheTTL
}

return &messageCache{cache: cache, ttl: ttl}, nil
}

// Put appends peer ID and message data and stores it in cache with TTL.
func (m *messageCache) Put(peer peer.ID, msg []byte) (bool, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be unexported also

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

key, err := generateCacheKey(peer, msg)
if err != nil {
return false, err
}

_, ok := m.cache.Get(key)
if ok {
return false, nil
}

ok = m.cache.SetWithTTL(key, "", 1, m.ttl)
return ok, nil
}

// Exists checks if <peer ID, message data> exist in cache.
func (m *messageCache) Exists(peer peer.ID, msg []byte) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

key, err := generateCacheKey(peer, msg)
if err != nil {
return false
}

_, ok := m.cache.Get(key)
return ok
}

func generateCacheKey(peer peer.ID, msg []byte) ([]byte, error) {
peerMsgHash, err := common.Blake2bHash(append([]byte(peer), msg...))
if err != nil {
return nil, err
}

return peerMsgHash.ToBytes(), nil
}
40 changes: 40 additions & 0 deletions dot/network/message_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package network

import (
"testing"
"time"

"github.com/dgraph-io/ristretto"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/require"
)

func TestMessageCache(t *testing.T) {
cacheSize := 64 << 20 // 64 MB
msgCache, err := newMessageCache(ristretto.Config{
NumCounters: int64(float64(cacheSize) * 0.05 * 2),
MaxCost: int64(float64(cacheSize) * 0.95),
BufferItems: 64,
Cost: func(value interface{}) int64 {
return int64(1)
},
}, 800*time.Millisecond)

peerID := peer.ID("gossamer")
msgData := []byte("testData")
require.NoError(t, err)

ok, err := msgCache.Put(peerID, msgData)
require.NoError(t, err)
require.True(t, ok)

time.Sleep(750 * time.Millisecond)

ok = msgCache.Exists(peerID, msgData)
require.True(t, ok)

time.Sleep(50 * time.Millisecond)

ok = msgCache.Exists(peerID, msgData)
require.False(t, ok)
}
27 changes: 19 additions & 8 deletions dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,19 +225,14 @@ func (s *Service) broadcastExcluding(info *notificationsProtocol, excluding peer
peers := s.host.peers()
rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] })

for i, peer := range peers { // TODO: check if stream is open, if not, open and send handshake
// TODO: configure this and determine ideal ratio, as well as when to use broadcast vs gossip
if i > len(peers)/3 {
return
}
info.mapMu.RLock()
defer info.mapMu.RUnlock()

for _, peer := range peers { // TODO: check if stream is open, if not, open and send handshake
if peer == excluding {
continue
}

info.mapMu.RLock()
defer info.mapMu.RUnlock()

if hsData, has := info.getHandshakeData(peer); !has || !hsData.received {
info.handshakeData.Store(peer, &handshakeData{
validated: false,
Expand All @@ -247,6 +242,22 @@ func (s *Service) broadcastExcluding(info *notificationsProtocol, excluding peer
logger.Trace("sending handshake", "protocol", info.protocolID, "peer", peer, "message", hs)
err = s.host.send(peer, info.protocolID, hs)
} else {
if s.host.messageCache != nil {
Comment on lines 244 to +245
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put this into one line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may lead to code duplication.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean?

var encMsg []byte
encMsg, err = msg.Encode()
if err != nil {
logger.Error("failed to encode message", "peer", peer, "error", err)
continue
}

var added bool
added, err = s.host.messageCache.Put(peer, encMsg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be nice to update Put to take Message, and encode the message inside Put so it doesn't need to be done here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will do that. I was trying to make it generic so it can be used for other messages as well.

if err != nil || !added {
logger.Error("failed to add message to cache", "peer", peer, "error", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will Put error if the key already exists?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. It will return false.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this case, it will log the error if the message is already in the cache, maybe remove the !added case for now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

continue
}
}

// we've already completed the handshake with the peer, send message directly
logger.Trace("sending message", "protocol", info.protocolID, "peer", peer, "message", msg)
err = s.host.send(peer, info.protocolID, msg)
Expand Down
69 changes: 69 additions & 0 deletions dot/network/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,75 @@ func TestBroadcastMessages(t *testing.T) {
require.NotNil(t, handler.messages[nodeA.host.id()])
}

func TestBroadcastDuplicateMessage(t *testing.T) {
msgCacheTTL = 2 * time.Second

basePathA := utils.NewTestBasePath(t, "nodeA")
configA := &Config{
BasePath: basePathA,
Port: 7001,
RandSeed: 1,
NoBootstrap: true,
NoMDNS: true,
}

nodeA := createTestService(t, configA)
defer nodeA.Stop()
nodeA.noGossip = true

basePathB := utils.NewTestBasePath(t, "nodeB")
configB := &Config{
BasePath: basePathB,
Port: 7002,
RandSeed: 2,
NoBootstrap: true,
NoMDNS: true,
}

nodeB := createTestService(t, configB)
defer nodeB.Stop()
nodeB.noGossip = true

handler := newTestStreamHandler(testBlockAnnounceHandshakeDecoder)
nodeB.host.registerStreamHandler(blockAnnounceID, handler.handleStream)

addrInfosB, err := nodeB.host.addrInfos()
require.NoError(t, err)

protocol := nodeA.notificationsProtocols[BlockAnnounceMsgType]
protocol.handshakeData.Store(nodeB.host.id(), &handshakeData{
received: true,
validated: true,
})

err = nodeA.host.connect(*addrInfosB[0])
// retry connect if "failed to dial" error
if failedToDial(err) {
time.Sleep(TestBackoffTimeout)
err = nodeA.host.connect(*addrInfosB[0])
}
require.NoError(t, err)

// Only one message will be sent.
for i := 0; i < 5; i++ {
nodeA.SendMessage(testBlockAnnounceMessage)
}

time.Sleep(time.Millisecond * 200)
require.Equal(t, 1, len(handler.messages[nodeA.host.id()]))

nodeA.host.messageCache = nil

// All 5 message will be sent since cache is disabled.
for i := 0; i < 5; i++ {
nodeA.SendMessage(testBlockAnnounceMessage)
require.NoError(t, err)
}

time.Sleep(time.Millisecond * 200)
require.Equal(t, 6, len(handler.messages[nodeA.host.id()]))
}

func TestService_NodeRoles(t *testing.T) {
basePath := utils.NewTestBasePath(t, "node")
cfg := &Config{
Expand Down
7 changes: 4 additions & 3 deletions dot/network/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ func (s *mockSyncer) SetSyncing(syncing bool) {
}

type testStreamHandler struct {
messages map[peer.ID]Message
messages map[peer.ID][]Message
decoder messageDecoder
}

func newTestStreamHandler(decoder messageDecoder) *testStreamHandler {
return &testStreamHandler{
messages: make(map[peer.ID]Message),
messages: make(map[peer.ID][]Message),
decoder: decoder,
}
}
Expand All @@ -93,7 +93,8 @@ func (s *testStreamHandler) handleStream(stream libp2pnetwork.Stream) {
}

func (s *testStreamHandler) handleMessage(stream libp2pnetwork.Stream, msg Message) error {
s.messages[stream.Conn().RemotePeer()] = msg
msgs := s.messages[stream.Conn().RemotePeer()]
s.messages[stream.Conn().RemotePeer()] = append(msgs, msg)
return nil
}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/cosmos/go-bip39 v1.0.0
github.com/davidlazar/go-crypto v0.0.0-20190912175916-7055855a373f // indirect
github.com/dgraph-io/badger/v2 v2.2007.2 // indirect
github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de
github.com/disiqueira/gotree v1.0.0
github.com/docker/docker v1.13.1
github.com/elastic/gosigar v0.14.0 // indirect
Expand Down